diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp index 9afc394843..57f1f54b7b 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.cpp +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.cpp @@ -108,19 +108,6 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s } // remove the old cached data _cached_src_physical_column->assume_mutable()->clear(); - if (is_consistent()) { - if (dst_logical_type->is_nullable()) { - auto doris_nullable_column = const_cast( - static_cast(dst_logical_column.get())); - _src_logical_column = ColumnNullable::create( - _cached_src_physical_column, doris_nullable_column->get_null_map_column_ptr()); - } else { - _src_logical_column = _cached_src_physical_column; - } - } else { - _src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_column, - dst_logical_type); - } if (dst_logical_type->is_nullable()) { // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.h b/be/src/vec/exec/format/parquet/parquet_column_convert.h index ede6e42648..bc6bc23232 100644 --- a/be/src/vec/exec/format/parquet/parquet_column_convert.h +++ b/be/src/vec/exec/format/parquet/parquet_column_convert.h @@ -160,7 +160,6 @@ class PhysicalToLogicalConverter { protected: ColumnPtr _cached_src_physical_column = nullptr; DataTypePtr _cached_src_physical_type = nullptr; - ColumnPtr _src_logical_column = nullptr; std::unique_ptr _logical_converter = nullptr; std::string _error_msg; @@ -179,20 +178,39 @@ public: PhysicalToLogicalConverter() = default; virtual ~PhysicalToLogicalConverter() = default; - virtual Status physical_convert(ColumnPtr& src_physical_col) { return Status::OK(); } - - Status convert(ColumnPtr& src_physical_col, MutableColumnPtr& dst_logical_col) { - // convert physical values and save in _src_logical_column - RETURN_IF_ERROR(physical_convert(src_physical_col)); - RETURN_IF_ERROR(_logical_converter->convert(_src_logical_column, dst_logical_col)); - if (_logical_converter->is_consistent()) { - // If logical converter is consistent, _src_logical_column is the final destination column, - // other components will check the use count - _src_logical_column.reset(); - } + virtual Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) { return Status::OK(); } + Status convert(ColumnPtr& src_physical_col, TypeDescriptor src_logical_type, + const DataTypePtr& dst_logical_type, ColumnPtr& dst_logical_col, + bool is_dict_filter) { + if (is_dict_filter) { + src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT); + } + if (is_consistent() && _logical_converter->is_consistent()) { + return Status::OK(); + } + ColumnPtr src_logical_column; + if (is_consistent()) { + if (dst_logical_type->is_nullable()) { + auto doris_nullable_column = const_cast( + static_cast(dst_logical_col.get())); + src_logical_column = + ColumnNullable::create(_cached_src_physical_column, + doris_nullable_column->get_null_map_column_ptr()); + } else { + src_logical_column = _cached_src_physical_column; + } + } else { + src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_col, + dst_logical_type); + } + RETURN_IF_ERROR(physical_convert(src_physical_col, src_logical_column)); + auto converted_column = dst_logical_col->assume_mutable(); + return _logical_converter->convert(src_logical_column, converted_column); + } + virtual ColumnPtr get_physical_column(tparquet::Type::type src_physical_type, TypeDescriptor src_logical_type, ColumnPtr& dst_logical_column, @@ -227,7 +245,7 @@ public: bool support() override { return false; } - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { return Status::InternalError("Unsupported physical to logical type: {}", _error_msg); } }; @@ -235,11 +253,11 @@ public: // for tinyint, smallint template class LittleIntPhysicalConverter : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using DstCppType = typename PrimitiveTypeTraits::CppType; using DstColumnType = typename PrimitiveTypeTraits::ColumnType; ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = from_col->size(); // always comes from tparquet::Type::INT32 @@ -262,9 +280,9 @@ private: public: FixedSizeBinaryConverter(int type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr from_col = remove_nullable(src_physical_col); - MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable(); auto* src_data = static_cast(from_col.get()); size_t length = src_data->size(); @@ -294,9 +312,9 @@ class FixedSizeToDecimal : public PhysicalToLogicalConverter { public: FixedSizeToDecimal(int32_t type_length) : _type_length(type_length) {} - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); #define M(FixedTypeLength, ValueCopyType) \ case FixedTypeLength: \ @@ -372,10 +390,10 @@ private: template class StringToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); DecimalScaleParams& scale_params = _convert_params->decimal_scale; @@ -413,10 +431,10 @@ class StringToDecimal : public PhysicalToLogicalConverter { template class NumberToDecimal : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { using ValueCopyType = DecimalType::NativeType; ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); auto* src_data = @@ -441,9 +459,9 @@ class NumberToDecimal : public PhysicalToLogicalConverter { }; class Int32ToDate : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -463,9 +481,9 @@ class Int32ToDate : public PhysicalToLogicalConverter { }; struct Int64ToTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size(); size_t start_idx = dst_col->size(); @@ -487,9 +505,9 @@ struct Int64ToTimestamp : public PhysicalToLogicalConverter { }; struct Int96toTimestamp : public PhysicalToLogicalConverter { - Status physical_convert(ColumnPtr& src_physical_col) override { + Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override { ColumnPtr src_col = remove_nullable(src_physical_col); - MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable(); + MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable(); size_t rows = src_col->size() / sizeof(ParquetInt96); auto& src_data = static_cast*>(src_col.get())->get_data(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 71b103d7fd..2a3782ab44 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -565,8 +565,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } } while (false); - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(resolved_column, converted_column); + return _converter->convert(resolved_column, _field_schema->type, type, doris_column, + is_dict_filter); } Status ArrayColumnReader::init(std::unique_ptr element_reader, diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 0ad5f83c1d..4dfbd6a380 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -268,8 +268,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } } - auto converted_column = doris_column->assume_mutable(); - return _converter->convert(src_column, converted_column); + return _converter->convert(src_column, field_schema->type, data_type, doris_column, false); } // Only the unit test depend on this, but it is wrong, should not use TTupleDesc to create tuple desc, not