[bugfix](iceberg)Fixed random core with writing iceberg partitioned table for 2.1 (#39808)(#39569) (#39832)

## Proposed changes

bp: #39808 #39569
This commit is contained in:
wuwenchi
2024-08-23 17:19:48 +08:00
committed by GitHub
parent 8f15efdbb8
commit c40246efa9
4 changed files with 132 additions and 33 deletions

View File

@ -125,7 +125,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) {
auto writer_iter = _partitions_to_writers.find("");
if (writer_iter == _partitions_to_writers.end()) {
try {
writer = _create_partition_writer(output_block, -1);
writer = _create_partition_writer(nullptr, -1);
} catch (doris::Exception& e) {
return e.to_status();
}
@ -141,7 +141,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) {
}
_partitions_to_writers.erase(writer_iter);
try {
writer = _create_partition_writer(output_block, -1, &file_name,
writer = _create_partition_writer(nullptr, -1, &file_name,
file_name_index + 1);
} catch (doris::Exception& e) {
return e.to_status();
@ -160,21 +160,21 @@ Status VIcebergTableWriter::write(vectorized::Block& block) {
}
{
Block transformed_block;
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
_transformed_block.reserve(_iceberg_partition_columns.size());
transformed_block.reserve(_iceberg_partition_columns.size());
for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
_transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
output_block, iceberg_partition_columns.source_idx()));
}
for (int i = 0; i < output_block.rows(); ++i) {
std::optional<PartitionData> partition_data;
try {
partition_data = _get_partition_data(_transformed_block, i);
partition_data = _get_partition_data(&transformed_block, i);
} catch (doris::Exception& e) {
return e.to_status();
}
std::string partition_name;
DCHECK(partition_data.has_value());
try {
partition_name = _partition_to_path(partition_data.value());
} catch (doris::Exception& e) {
@ -185,7 +185,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) {
const std::string* file_name, int file_name_index,
std::shared_ptr<VIcebergPartitionWriter>& writer_ptr) -> Status {
try {
auto writer = _create_partition_writer(output_block, position, file_name,
auto writer = _create_partition_writer(&transformed_block, position, file_name,
file_name_index);
RETURN_IF_ERROR(writer->open(_state, _profile));
IColumn::Filter filter(output_block.rows(), 0);
@ -341,30 +341,27 @@ std::vector<std::string> VIcebergTableWriter::_partition_values(
}
std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_writer(
vectorized::Block& block, int position, const std::string* file_name, int file_name_index) {
vectorized::Block* transformed_block, int position, const std::string* file_name,
int file_name_index) {
auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
std::optional<PartitionData> partition_data;
partition_data = _get_partition_data(_transformed_block, position);
std::string partition_path;
std::vector<std::string> partition_values;
if (partition_data.has_value()) {
partition_path = _partition_to_path(partition_data.value());
partition_values = _partition_values(partition_data.value());
}
const std::string& output_path = iceberg_table_sink.output_path;
std::string write_path;
std::string original_write_path;
std::string target_path;
if (partition_path.empty()) {
original_write_path = iceberg_table_sink.original_output_path;
target_path = output_path;
write_path = output_path;
} else {
if (transformed_block != nullptr) {
PartitionData partition_data = _get_partition_data(transformed_block, position);
std::string partition_path = _partition_to_path(partition_data);
partition_values = _partition_values(partition_data);
original_write_path =
fmt::format("{}/{}", iceberg_table_sink.original_output_path, partition_path);
target_path = fmt::format("{}/{}", output_path, partition_path);
write_path = fmt::format("{}/{}", output_path, partition_path);
} else {
original_write_path = iceberg_table_sink.original_output_path;
target_path = output_path;
write_path = output_path;
}
VIcebergPartitionWriter::WriteInfo write_info = {
@ -387,18 +384,15 @@ std::shared_ptr<VIcebergPartitionWriter> VIcebergTableWriter::_create_partition_
iceberg_table_sink.hadoop_config);
}
std::optional<PartitionData> VIcebergTableWriter::_get_partition_data(
vectorized::Block& transformed_block, int position) {
if (_iceberg_partition_columns.empty()) {
return std::nullopt;
}
PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* transformed_block,
int position) {
DCHECK(!_iceberg_partition_columns.empty());
std::vector<std::any> values;
values.reserve(_iceberg_partition_columns.size());
int column_idx = 0;
for (auto& iceberg_partition_column : _iceberg_partition_columns) {
const vectorized::ColumnWithTypeAndName& partition_column =
transformed_block.get_by_position(column_idx);
transformed_block->get_by_position(column_idx);
const TypeDescriptor& result_type =
iceberg_partition_column.partition_column_transform().get_result_type();
auto value = _get_iceberg_partition_value(result_type, partition_column, position);

View File

@ -97,10 +97,10 @@ private:
std::vector<std::string> _partition_values(const doris::iceberg::StructLike& data);
std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
vectorized::Block& block, int position, const std::string* file_name = nullptr,
int file_name_index = 0);
vectorized::Block* transformed_block, int position,
const std::string* file_name = nullptr, int file_name_index = 0);
std::optional<PartitionData> _get_partition_data(vectorized::Block& block, int position);
PartitionData _get_partition_data(vectorized::Block* block, int position);
std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc,
const ColumnWithTypeAndName& partition_column,
@ -127,8 +127,6 @@ private:
VExprContextSPtrs _write_output_vexpr_ctxs;
Block _transformed_block;
size_t _row_count = 0;
// profile counters

View File

@ -0,0 +1,23 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !qt01 --
2450841 2450841
2450841 2450841
2450842 2450842
2450842 2450842
2450843 2450843
2450843 2450843
2450844 2450844
2450844 2450844
2450845 2450845
2450845 2450845
2450846 2450846
2450846 2450846
2450847 2450847
2450847 2450847
2450848 2450848
2450848 2450848
2450849 2450849
2450849 2450849
2450850 2450850
2450850 2450850

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_iceberg_overwrite_with_wrong_partition", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable iceberg test.")
return
}
String tb1 = "tb_dst";
String tb2 = "tb_src";
try {
String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_overwrite_with_wrong_partition"
sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""
sql """ switch ${catalog_name} """
sql """ use multi_catalog """
sql """ drop table if exists ${tb1} """
sql """ drop table if exists ${tb2} """
sql """
create table ${tb1} (
id bigint,
id2 bigint
) PARTITION BY LIST(id2)() ;
"""
sql """
create table ${tb2} (
id bigint,
id2 bigint
);
"""
sql """ insert into ${tb2} values (2450841,2450841), (2450842,2450842); """
sql """ insert into ${tb2} values (2450843,2450843), (2450844,2450844); """
sql """ insert into ${tb2} values (2450845,2450845), (2450846,2450846); """
sql """ insert into ${tb2} values (2450847,2450847), (2450848,2450848); """
sql """ insert into ${tb2} values (2450849,2450849), (2450850,2450850); """
sql """ insert into ${tb2} values (2450841,2450841), (2450842,2450842); """
sql """ insert into ${tb2} values (2450843,2450843), (2450844,2450844); """
sql """ insert into ${tb2} values (2450845,2450845), (2450846,2450846); """
sql """ insert into ${tb2} values (2450847,2450847), (2450848,2450848); """
sql """ insert into ${tb2} values (2450849,2450849), (2450850,2450850); """
sql """ insert overwrite table ${tb1} (id, id2) select id, id2 from ${tb2} where id2 >= 2450841 AND id2 < 2450851; """
order_qt_qt01 """ select * from ${tb1} """
} finally {
sql """ drop table if exists ${tb1} """
sql """ drop table if exists ${tb2} """
}
}