support parquet bool type

This commit is contained in:
wjhh2008 2024-11-18 03:44:50 +00:00 committed by ob-robot
parent 92016c007e
commit a856bebf71
2 changed files with 63 additions and 6 deletions

View File

@ -375,7 +375,9 @@ int ObParquetTableRowIterator::next_file()
}
}
LOG_WARN("not supported type", K(ret), K(file_column_exprs_.at(i)->datum_meta_),
K(ObString(p_type.length(), p_type.data())), "rep_level", col_desc->max_repetition_level());
K(ObString(p_type.length(), p_type.data())),
K(col_desc->physical_type()),
"rep_level", col_desc->max_repetition_level());
LOG_USER_ERROR(OB_EXTERNAL_FILE_COLUMN_TYPE_MISMATCH, p_type.c_str(), ob_type);
} else {
column_indexs_.at(i) = column_index;
@ -446,7 +448,13 @@ ObParquetTableRowIterator::DataLoader::LOAD_FUNC ObParquetTableRowIterator::Data
const parquet::LogicalType* log_type = col_desc->logical_type().get();
parquet::Type::type phy_type = col_desc->physical_type();
bool no_log_type = log_type->is_none();
if ((no_log_type || log_type->is_int()) && ob_is_integer_type(datum_type.type_)) {
if (no_log_type && parquet::Type::BOOLEAN == phy_type) {
if (ob_is_decimal_int_tc(datum_type.type_)) {
func = &DataLoader::load_decimal_any_col;
} else if (ob_is_integer_type(datum_type.type_)) {
func = &DataLoader::load_bool_to_int64_vec;
}
} else if ((no_log_type || log_type->is_int()) && ob_is_integer_type(datum_type.type_)) {
//convert parquet int storing as int32/int64 to
// ObTinyIntType/ObSmallIntType/ObMediumIntType/ObInt32Type/ObIntType using int64_t memory layout
// ObUTinyIntType/ObUSmallIntType/ObUMediumIntType/ObUInt32Type/ObUInt64Type using uint64_t memory layout
@ -795,6 +803,22 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
OZ (to_numeric(i + row_offset_, values.at(j++)));
}
}
} else if (reader_->descr()->physical_type() == parquet::Type::type::BOOLEAN) {
ObArrayWrap<bool> values;
OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_));
if (OB_SUCC(ret)) {
row_count_ = static_cast<parquet::BoolReader*>(reader_)->ReadBatch(
batch_size_, def_levels_buf_.get_data(), rep_levels_buf_.get_data(),
values.get_data(), &values_cnt);
}
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_);
} else {
OZ (to_numeric(i + row_offset_, values.at(j++)));
}
}
} else if (reader_->descr()->physical_type() == parquet::Type::Type::FIXED_LEN_BYTE_ARRAY) {
ObArrayWrap<parquet::FixedLenByteArray> values;
int32_t fixed_length = reader_->descr()->type_length();
@ -967,10 +991,10 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec()
int64_t values_cnt = 0;
ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_);
int16_t max_def_level = reader_->descr()->max_definition_level();
ObFixedLengthBase *int32_vec = static_cast<ObFixedLengthBase *>(file_col_expr_->get_vector(eval_ctx_));
ObFixedLengthBase *int64_vec = static_cast<ObFixedLengthBase *>(file_col_expr_->get_vector(eval_ctx_));
ObArrayWrap<int32_t> values;
CK (VEC_FIXED == int32_vec->get_format());
CK (VEC_FIXED == int64_vec->get_format());
OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_));
if (OB_SUCC(ret)) {
row_count_ = static_cast<parquet::Int32Reader*>(reader_)->ReadBatch(
@ -983,9 +1007,41 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec()
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
int32_vec->set_null(i + row_offset_);
int64_vec->set_null(i + row_offset_);
} else {
int32_vec->set_int(i + row_offset_, values.at(j++));
int64_vec->set_int(i + row_offset_, values.at(j++));
}
}
}
}
return ret;
}
int ObParquetTableRowIterator::DataLoader::load_bool_to_int64_vec()
{
int ret = OB_SUCCESS;
int64_t values_cnt = 0;
ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_);
int16_t max_def_level = reader_->descr()->max_definition_level();
ObFixedLengthBase *int64_vec = static_cast<ObFixedLengthBase *>(file_col_expr_->get_vector(eval_ctx_));
ObArrayWrap<bool> values;
CK (VEC_FIXED == int64_vec->get_format());
OZ (values.allocate_array(tmp_alloc_g.get_allocator(), batch_size_));
if (OB_SUCC(ret)) {
row_count_ = static_cast<parquet::BoolReader*>(reader_)->ReadBatch(
batch_size_, def_levels_buf_.get_data(), rep_levels_buf_.get_data(),
values.get_data(), &values_cnt);
if (OB_UNLIKELY(values_cnt > row_count_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("repeated data not support");
} else {
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
int64_vec->set_null(i + row_offset_);
} else {
int64_vec->set_int(i + row_offset_, values.at(j++));
}
}
}

View File

@ -179,6 +179,7 @@ private:
int load_int64_to_int64_vec();
int load_int32_to_int64_vec();
int load_int32_to_int32_vec();
int load_bool_to_int64_vec();
int load_string_col();
int load_fixed_string_col();
int load_decimal_any_col();