[fix](schema change) resolve the use count check of source logical column (#33932)

Fix error like:
```
8# google::LogMessageFatal::~LogMessageFatal() in /mnt/hdd01/ci/master-deploy/be/lib/doris_be
 9# doris::vectorized::Block::clear_column_data(int) in /mnt/hdd01/ci/master-deploy/be/lib/doris_be
10# doris::vectorized::ParquetReader::get_next_block(doris::vectorized::Block*, unsigned long*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/format/parquet/vparquet_reader.cpp:514
11# doris::vectorized::VFileScanner::_get_block_impl(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vfile_scanner.cpp:333
12# doris::vectorized::VScanner::get_block(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:132
13# doris::vectorized::VScanner::get_block_after_projects(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /home/zcp/repo_center/doris_master/doris/be/src/vec/exec/scan/vscanner.cpp:99
```

Because source logical column is the destination logical column if logical converter is consistent. Previously, the reference of column was reset after the conversion was completed, but if an EOF occurred, it was returned in advance, but EOF is not a true error.
```
if (_logical_converter->is_consistent()) {
            // If logical converter is consistent, _src_logical_column is the final destination column,
            // other components will check the use count
            _src_logical_column.reset();
}
```
This commit is contained in:
Ashin Gau
2024-04-22 11:34:13 +08:00
committed by yiguolei
parent ff5a4cb5b9
commit c631f4f8a8
4 changed files with 50 additions and 46 deletions

View File

@ -108,19 +108,6 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s
}
// remove the old cached data
_cached_src_physical_column->assume_mutable()->clear();
if (is_consistent()) {
if (dst_logical_type->is_nullable()) {
auto doris_nullable_column = const_cast<ColumnNullable*>(
static_cast<const ColumnNullable*>(dst_logical_column.get()));
_src_logical_column = ColumnNullable::create(
_cached_src_physical_column, doris_nullable_column->get_null_map_column_ptr());
} else {
_src_logical_column = _cached_src_physical_column;
}
} else {
_src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_column,
dst_logical_type);
}
if (dst_logical_type->is_nullable()) {
// In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will

View File

@ -160,7 +160,6 @@ class PhysicalToLogicalConverter {
protected:
ColumnPtr _cached_src_physical_column = nullptr;
DataTypePtr _cached_src_physical_type = nullptr;
ColumnPtr _src_logical_column = nullptr;
std::unique_ptr<converter::ColumnTypeConverter> _logical_converter = nullptr;
std::string _error_msg;
@ -179,20 +178,39 @@ public:
PhysicalToLogicalConverter() = default;
virtual ~PhysicalToLogicalConverter() = default;
virtual Status physical_convert(ColumnPtr& src_physical_col) { return Status::OK(); }
Status convert(ColumnPtr& src_physical_col, MutableColumnPtr& dst_logical_col) {
// convert physical values and save in _src_logical_column
RETURN_IF_ERROR(physical_convert(src_physical_col));
RETURN_IF_ERROR(_logical_converter->convert(_src_logical_column, dst_logical_col));
if (_logical_converter->is_consistent()) {
// If logical converter is consistent, _src_logical_column is the final destination column,
// other components will check the use count
_src_logical_column.reset();
}
virtual Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) {
return Status::OK();
}
Status convert(ColumnPtr& src_physical_col, TypeDescriptor src_logical_type,
const DataTypePtr& dst_logical_type, ColumnPtr& dst_logical_col,
bool is_dict_filter) {
if (is_dict_filter) {
src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT);
}
if (is_consistent() && _logical_converter->is_consistent()) {
return Status::OK();
}
ColumnPtr src_logical_column;
if (is_consistent()) {
if (dst_logical_type->is_nullable()) {
auto doris_nullable_column = const_cast<ColumnNullable*>(
static_cast<const ColumnNullable*>(dst_logical_col.get()));
src_logical_column =
ColumnNullable::create(_cached_src_physical_column,
doris_nullable_column->get_null_map_column_ptr());
} else {
src_logical_column = _cached_src_physical_column;
}
} else {
src_logical_column = _logical_converter->get_column(src_logical_type, dst_logical_col,
dst_logical_type);
}
RETURN_IF_ERROR(physical_convert(src_physical_col, src_logical_column));
auto converted_column = dst_logical_col->assume_mutable();
return _logical_converter->convert(src_logical_column, converted_column);
}
virtual ColumnPtr get_physical_column(tparquet::Type::type src_physical_type,
TypeDescriptor src_logical_type,
ColumnPtr& dst_logical_column,
@ -227,7 +245,7 @@ public:
bool support() override { return false; }
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
return Status::InternalError("Unsupported physical to logical type: {}", _error_msg);
}
};
@ -235,11 +253,11 @@ public:
// for tinyint, smallint
template <PrimitiveType IntPrimitiveType>
class LittleIntPhysicalConverter : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
using DstCppType = typename PrimitiveTypeTraits<IntPrimitiveType>::CppType;
using DstColumnType = typename PrimitiveTypeTraits<IntPrimitiveType>::ColumnType;
ColumnPtr from_col = remove_nullable(src_physical_col);
MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = from_col->size();
// always comes from tparquet::Type::INT32
@ -262,9 +280,9 @@ private:
public:
FixedSizeBinaryConverter(int type_length) : _type_length(type_length) {}
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr from_col = remove_nullable(src_physical_col);
MutableColumnPtr to_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable();
auto* src_data = static_cast<const ColumnUInt8*>(from_col.get());
size_t length = src_data->size();
@ -294,9 +312,9 @@ class FixedSizeToDecimal : public PhysicalToLogicalConverter {
public:
FixedSizeToDecimal(int32_t type_length) : _type_length(type_length) {}
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
#define M(FixedTypeLength, ValueCopyType) \
case FixedTypeLength: \
@ -372,10 +390,10 @@ private:
template <typename DecimalType, DecimalScaleParams::ScaleType ScaleType>
class StringToDecimal : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
using ValueCopyType = DecimalType::NativeType;
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = src_col->size();
DecimalScaleParams& scale_params = _convert_params->decimal_scale;
@ -413,10 +431,10 @@ class StringToDecimal : public PhysicalToLogicalConverter {
template <typename NumberType, typename DecimalType, DecimalScaleParams::ScaleType ScaleType>
class NumberToDecimal : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
using ValueCopyType = DecimalType::NativeType;
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = src_col->size();
auto* src_data =
@ -441,9 +459,9 @@ class NumberToDecimal : public PhysicalToLogicalConverter {
};
class Int32ToDate : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = src_col->size();
size_t start_idx = dst_col->size();
@ -463,9 +481,9 @@ class Int32ToDate : public PhysicalToLogicalConverter {
};
struct Int64ToTimestamp : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = src_col->size();
size_t start_idx = dst_col->size();
@ -487,9 +505,9 @@ struct Int64ToTimestamp : public PhysicalToLogicalConverter {
};
struct Int96toTimestamp : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col) override {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
ColumnPtr src_col = remove_nullable(src_physical_col);
MutableColumnPtr dst_col = remove_nullable(_src_logical_column)->assume_mutable();
MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
size_t rows = src_col->size() / sizeof(ParquetInt96);
auto& src_data = static_cast<const ColumnVector<Int8>*>(src_col.get())->get_data();

View File

@ -565,8 +565,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
}
} while (false);
auto converted_column = doris_column->assume_mutable();
return _converter->convert(resolved_column, converted_column);
return _converter->convert(resolved_column, _field_schema->type, type, doris_column,
is_dict_filter);
}
Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_reader,

View File

@ -268,8 +268,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
chunk_reader.decode_values(data_column, resolved_type, run_length_map, false));
}
}
auto converted_column = doris_column->assume_mutable();
return _converter->convert(src_column, converted_column);
return _converter->convert(src_column, field_schema->type, data_type, doris_column, false);
}
// Only the unit test depend on this, but it is wrong, should not use TTupleDesc to create tuple desc, not