From c631f4f8a8820efbf634edcfa45595fe2fcffe73 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Mon, 22 Apr 2024 11:34:13 +0800 Subject: [PATCH] [fix](schema change) resolve the use count check of source logical column (#33932) Fix error like: ``` 8# google::LogMessageFatal::~LogMessageFatal() in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 9# doris::vectorized::Block::clear_column_data(int) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be 10# doris::vectorized::ParquetReader::get_next_block(doris::vectorized::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/format/parquet/vparquet_reader.cpp:514 11# doris::vectorized::VFileScanner::_get_block_impl(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vfile_scanner.cpp:333 12# doris::vectorized::VScanner::get_block(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:132 13# doris::vectorized::VScanner::get_block_after_projects(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:99 ``` Because source logical column is the destination logical column if logical converter is consistent. Previously, the reference of column was reset after the conversion was completed, but if an EOF occurred, it was returned in advance, but EOF is not a true error. ``` if (_logical_converter->is_consistent()) { // If logical converter is consistent, _src_logical_column is the final destination column, // other components will check the use count _src_logical_column.reset(); } ``` --- .../format/parquet/parquet_column_convert.cpp | 13 ---- .../format/parquet/parquet_column_convert.h | 76 ++++++++++++------- .../format/parquet/vparquet_column_reader.cpp | 4 +- .../vec/exec/parquet/parquet_thrift_test.cpp | 3 +- 4 files changed, 50 insertions(+), 46 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp index 9afc394843..57f1f54b7b 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp @@ -108,19 +108,6 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s } // remove the old cached data _cached_src_physical_column->assume_mutable()->clear(); - if (is_consistent()) { - if (dst_logical_type->is_nullable()) { - auto doris_nullable_column = const_cast( - static_cast(dst_logical_column.get())); - _src_logical_column = ColumnNullable::create( - _cached_src_physical_column, doris_nullable_column->get_null_map_column_ptr()); - } else { - _src_logical_column = _cached_src_physical_column; - } - } else { - _src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_column, - dst_logical_type); - } if (dst_logical_type->is_nullable()) { // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.h b/be/src/vec/exec/format/parquet/parquet_column_convert.h index ede6e42648..bc6bc23232 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.h +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.h @@ -160,7 +160,6 @@ class PhysicalToLogicalConverter { protected: ColumnPtr _cached_src_physical_column = nullptr; DataTypePtr _cached_src_physical_type = nullptr; - ColumnPtr _src_logical_column = nullptr; std::unique_ptr _logical_converter = nullptr; std::string _error_msg; @@ -179,20 +178,39 @@ public: PhysicalToLogicalConverter() = default; virtual ~PhysicalToLogicalConverter() = default; - virtual Status physical_convert(ColumnPtr& src_physical_col) { return Status::OK(); } - - Status convert(ColumnPtr& src_physical_col, MutableColumnPtr& dst_logical_col) { - // convert physical values and save in _src_logical_column - RETURN_IF_ERROR(physical_convert(src_physical_col)); - RETURN_IF_ERROR(_logical_converter->convert(_src_logical_column, dst_logical_col)); - if (_logical_converter->is_consistent()) { - // If logical converter is consistent, _src_logical_column is the final destination column, - // other components will check the use count - _src_logical_column.reset(); - } + virtual Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) { return Status::OK(); } + Status convert(ColumnPtr& src_physical_col, TypeDescriptor src_logical_type, + const DataTypePtr& dst_logical_type, ColumnPtr& dst_logical_col, + bool is_dict_filter) { + if (is_dict_filter) { + src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT); + } + if (is_consistent() && _logical_converter->is_consistent()) { + return Status::OK(); + } + ColumnPtr src_logical_column; + if (is_consistent()) { + if (dst_logical_type->is_nullable()) { + auto doris_nullable_column = const_cast( + static_cast(dst_logical_col.get())); + src_logical_column = + ColumnNullable::create(_cached_src_physical_column, + doris_nullable_column->get_null_map_column_ptr()); + } else { + src_logical_column = _cached_src_physical_column; + } + } else { + src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_col, + dst_logical_type); + } + RETURN_IF_ERROR(physical_convert(src_physical_col, src_logical_column)); + auto converted_column = dst_logical_col->assume_mutable(); + return _logical_converter->convert(src_logical_column, converted_column); + } + virtual ColumnPtr get_physical_column(tparquet::Type::type src_physical_type, TypeDescriptor src_logical_type, ColumnPtr& dst_logical_column, @@ -227,7 +245,7 @@ public: bool support() override { return false; } - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { return Status::InternalError("Unsupported physical to logical type: {}", _error_msg); } }; @@ -235,11 +253,11 @@ public: // for tinyint, smallint template class LittleIntPhysicalConverter : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using DstCppType = typename PrimitiveTypeTraits::CppType; using DstColumnType = typename PrimitiveTypeTraits::ColumnType; ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = from_col->size(); // always comes from tparquet::Type::INT32 @@ -262,9 +280,9 @@ private: public: FixedSizeBinaryConverter(int type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); auto* src_data = static_cast(from_col.get()); size_t length = src_data->size(); @@ -294,9 +312,9 @@ class FixedSizeToDecimal : public PhysicalToLogicalConverter { public: FixedSizeToDecimal(int32_t type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); #define M(FixedTypeLength, ValueCopyType) \ case FixedTypeLength: \ @@ -372,10 +390,10 @@ private: template class StringToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); DecimalScaleParams& scale_params = _convert_params->decimal_scale; @@ -413,10 +431,10 @@ class StringToDecimal : public PhysicalToLogicalConverter { template class NumberToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); auto* src_data = @@ -441,9 +459,9 @@ class NumberToDecimal : public PhysicalToLogicalConverter { }; class Int32ToDate : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -463,9 +481,9 @@ class Int32ToDate : public PhysicalToLogicalConverter { }; struct Int64ToTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -487,9 +505,9 @@ struct Int64ToTimestamp : public PhysicalToLogicalConverter { }; struct Int96toTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size() / sizeof(ParquetInt96); auto& src_data = static_cast*>(src_col.get())->get_data(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 71b103d7fd..2a3782ab44 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -565,8 +565,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } } while (false); - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(resolved_column, converted_column); + return _converter->convert(resolved_column, _field_schema->type, type, doris_column, + is_dict_filter); } Status ArrayColumnReader::init(std::unique_ptr element_reader, diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 0ad5f83c1d..4dfbd6a380 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -268,8 +268,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } } - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(src_column, converted_column); + return _converter->convert(src_column, field_schema->type, data_type, doris_column, false); } // Only the unit test depend on this, but it is wrong, should not use TTupleDesc to create tuple desc, not