From a856bebf71a37dd06cd078ee68bd3a899932a2cb Mon Sep 17 00:00:00 2001 From: wjhh2008 Date: Mon, 18 Nov 2024 03:44:50 +0000 Subject: [PATCH] support parquet bool type --- .../table/ob_parquet_table_row_iter.cpp | 68 +++++++++++++++++-- .../engine/table/ob_parquet_table_row_iter.h | 1 + 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.cpp b/src/sql/engine/table/ob_parquet_table_row_iter.cpp index 1c7b16faf..1420ce9fe 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.cpp +++ b/src/sql/engine/table/ob_parquet_table_row_iter.cpp @@ -375,7 +375,9 @@ int ObParquetTableRowIterator::next_file() } } LOG_WARN("not supported type", K(ret), K(file_column_exprs_.at(i)->datum_meta_), - K(ObString(p_type.length(), p_type.data())), "rep_level", col_desc->max_repetition_level()); + K(ObString(p_type.length(), p_type.data())), + K(col_desc->physical_type()), + "rep_level", col_desc->max_repetition_level()); LOG_USER_ERROR(OB_EXTERNAL_FILE_COLUMN_TYPE_MISMATCH, p_type.c_str(), ob_type); } else { column_indexs_.at(i) = column_index; @@ -446,7 +448,13 @@ ObParquetTableRowIterator::DataLoader::LOAD_FUNC ObParquetTableRowIterator::Data const parquet::LogicalType* log_type = col_desc->logical_type().get(); parquet::Type::type phy_type = col_desc->physical_type(); bool no_log_type = log_type->is_none(); - if ((no_log_type || log_type->is_int()) && ob_is_integer_type(datum_type.type_)) { + if (no_log_type && parquet::Type::BOOLEAN == phy_type) { + if (ob_is_decimal_int_tc(datum_type.type_)) { + func = &DataLoader::load_decimal_any_col; + } else if (ob_is_integer_type(datum_type.type_)) { + func = &DataLoader::load_bool_to_int64_vec; + } + } else if ((no_log_type || log_type->is_int()) && ob_is_integer_type(datum_type.type_)) { //convert parquet int storing as int32/int64 to // ObTinyIntType/ObSmallIntType/ObMediumIntType/ObInt32Type/ObIntType using int64_t memory layout // ObUTinyIntType/ObUSmallIntType/ObUMediumIntType/ObUInt32Type/ObUInt64Type using uint64_t memory layout @@ -795,6 +803,22 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() OZ (to_numeric(i + row_offset_, values.at(j++))); } } + } else if (reader_->descr()->physical_type() == parquet::Type::type::BOOLEAN) { + ObArrayWrap values; + OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_)); + if (OB_SUCC(ret)) { + row_count_ = static_cast(reader_)->ReadBatch( + batch_size_, def_levels_buf_.get_data(), rep_levels_buf_.get_data(), + values.get_data(), &values_cnt); + } + int j = 0; + for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { + if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { + file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_); + } else { + OZ (to_numeric(i + row_offset_, values.at(j++))); + } + } } else if (reader_->descr()->physical_type() == parquet::Type::Type::FIXED_LEN_BYTE_ARRAY) { ObArrayWrap values; int32_t fixed_length = reader_->descr()->type_length(); @@ -967,10 +991,10 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec() int64_t values_cnt = 0; ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); int16_t max_def_level = reader_->descr()->max_definition_level(); - ObFixedLengthBase *int32_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + ObFixedLengthBase *int64_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); ObArrayWrap values; - CK (VEC_FIXED == int32_vec->get_format()); + CK (VEC_FIXED == int64_vec->get_format()); OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_)); if (OB_SUCC(ret)) { row_count_ = static_cast(reader_)->ReadBatch( @@ -983,9 +1007,41 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec() int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - int32_vec->set_null(i + row_offset_); + int64_vec->set_null(i + row_offset_); } else { - int32_vec->set_int(i + row_offset_, values.at(j++)); + int64_vec->set_int(i + row_offset_, values.at(j++)); + } + } + } + } + return ret; +} + +int ObParquetTableRowIterator::DataLoader::load_bool_to_int64_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + int16_t max_def_level = reader_->descr()->max_definition_level(); + ObFixedLengthBase *int64_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + ObArrayWrap values; + + CK (VEC_FIXED == int64_vec->get_format()); + OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_)); + if (OB_SUCC(ret)) { + row_count_ = static_cast(reader_)->ReadBatch( + batch_size_, def_levels_buf_.get_data(), rep_levels_buf_.get_data(), + values.get_data(), &values_cnt); + if (OB_UNLIKELY(values_cnt > row_count_)) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("repeated data not support"); + } else { + int j = 0; + for (int i = 0; i < row_count_; i++) { + if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { + int64_vec->set_null(i + row_offset_); + } else { + int64_vec->set_int(i + row_offset_, values.at(j++)); } } } diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.h b/src/sql/engine/table/ob_parquet_table_row_iter.h index 4e0de58ae..c6f419e24 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.h +++ b/src/sql/engine/table/ob_parquet_table_row_iter.h @@ -179,6 +179,7 @@ private: int load_int64_to_int64_vec(); int load_int32_to_int64_vec(); int load_int32_to_int32_vec(); + int load_bool_to_int64_vec(); int load_string_col(); int load_fixed_string_col(); int load_decimal_any_col();