From e6034575d0eeb42ebbebe797c07f4a7dd2ac2434 Mon Sep 17 00:00:00 2001 From: doubleMocha <1085615789@qq.com> Date: Mon, 9 Dec 2024 09:15:20 +0000 Subject: [PATCH] [orc] fix some bugs of reading orc file --- src/objit/src/ob_llvm_di_helper.cpp | 6 +- .../engine/table/ob_orc_table_row_iter.cpp | 126 +++++++++++++++--- src/sql/engine/table/ob_orc_table_row_iter.h | 10 +- 3 files changed, 117 insertions(+), 25 deletions(-) diff --git a/src/objit/src/ob_llvm_di_helper.cpp b/src/objit/src/ob_llvm_di_helper.cpp index e411a1b47..8b7e5737b 100644 --- a/src/objit/src/ob_llvm_di_helper.cpp +++ b/src/objit/src/ob_llvm_di_helper.cpp @@ -425,9 +425,9 @@ ObLLVMDIHelper::ObDIBasicTypeAttr ObLLVMDIHelper::basic_type_[common::ObMaxType] {"null", 0, 0, 0}, {"tinyint", 8, 8, llvm::dwarf::DW_ATE_signed}, {"smallint", 16, 16, llvm::dwarf::DW_ATE_signed}, - {"mediumint", 32, 32, llvm::dwarf::DW_ATE_signed}, - {"int", 32, 32, llvm::dwarf::DW_ATE_signed}, - {"bigint", 64, 64, llvm::dwarf::DW_ATE_signed}, + {"mediumint", 32, 32, llvm::dwarf::DW_ATE_signed}, + {"int", 32, 32, llvm::dwarf::DW_ATE_signed}, + {"bigint", 64, 64, llvm::dwarf::DW_ATE_signed}, {"tinyint unsigned", 8, 8, llvm::dwarf::DW_ATE_unsigned}, {"smallint unsigned", 16, 16, llvm::dwarf::DW_ATE_unsigned}, {"mediumint unsigned", 32, 32, llvm::dwarf::DW_ATE_unsigned}, diff --git a/src/sql/engine/table/ob_orc_table_row_iter.cpp b/src/sql/engine/table/ob_orc_table_row_iter.cpp index 232ee6a33..c0704dd7f 100644 --- a/src/sql/engine/table/ob_orc_table_row_iter.cpp +++ b/src/sql/engine/table/ob_orc_table_row_iter.cpp @@ -373,7 +373,8 @@ int ObOrcTableRowIterator::next_file() } } catch(const std::exception& e) { if (OB_SUCC(ret)) { - ret = OB_ERR_UNEXPECTED; + ret = OB_INVALID_EXTERNAL_FILE; + LOG_USER_ERROR(OB_INVALID_EXTERNAL_FILE, e.what()); LOG_WARN("unexpected error", K(ret), "Info", e.what()); } } catch(...) { @@ -387,23 +388,23 @@ int ObOrcTableRowIterator::next_file() return ret; } -bool ObOrcTableRowIterator::DataLoader::is_orc_read_utc() +bool ObOrcTableRowIterator::DataLoader::is_orc_read_utc(const orc::Type *type) { - return true; + // TIMESTAMP_INSTANT 是utc时间 + return type->getKind() == orc::TypeKind::TIMESTAMP_INSTANT; } bool ObOrcTableRowIterator::DataLoader::is_ob_type_store_utc(const ObDatumMeta &meta) { - return ObTimestampType == meta.type_ || ObDateType == meta.type_ - || (lib::is_mysql_mode() && ObDateTimeType == meta.type_) - || (lib::is_mysql_mode() && ObTimeType == meta.type_); + return (lib::is_mysql_mode() && ObTimestampType == meta.type_) + || (lib::is_oracle_mode() && ObTimestampLTZType == meta.type_); } int64_t ObOrcTableRowIterator::DataLoader::calc_tz_adjust_us() { int64_t res = 0; int ret = OB_SUCCESS; - bool is_utc_src = is_orc_read_utc(); + bool is_utc_src = is_orc_read_utc(col_type_); bool is_utc_dst = is_ob_type_store_utc(file_col_expr_->datum_meta_); if (is_utc_src != is_utc_dst) { int32_t tmp_offset = 0; @@ -463,6 +464,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader:: if (temp_obj.get_tight_data_len() < orc_data_len) { func = NULL; } + } else if (ob_is_year_tc(datum_type.type_) && orc::TypeKind::INT == type_kind) { + func = &DataLoader::load_year_vec; } else if (ob_is_string_tc(datum_type.type_) || ob_is_enum_or_set_type(datum_type.type_)) { //convert orc enum/string to enum/string vector switch (type_kind) { @@ -482,7 +485,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader:: } else if (ob_is_number_or_decimal_int_tc(datum_type.type_)) { //convert orc int storing as int32/int64 to number/decimal vector if (type_kind == orc::TypeKind::DECIMAL) { - if (col_desc->getPrecision() != datum_type.precision_ || col_desc->getScale() != datum_type.scale_) { + if (col_desc->getPrecision() != ((datum_type.precision_ == -1) ? 38 : datum_type.precision_) + || col_desc->getScale() != datum_type.scale_) { func = NULL; } else if (col_desc->getPrecision() == 0 || col_desc->getPrecision() > 18) { func = &DataLoader::load_dec128_vec; @@ -514,7 +518,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader:: ob_is_datetime(datum_type.type_) || ob_is_time_tc(datum_type.type_) || ObTimestampType == datum_type.type_ || - ObTimestampLTZType == datum_type.type_) { + ObTimestampLTZType == datum_type.type_ || + ObTimestampNanoType == datum_type.type_) { func = &DataLoader::load_timestamp_vec; } break; @@ -587,16 +592,30 @@ int ObOrcTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) ret = OB_ERR_UNEXPECTED; LOG_WARN("row reader is null", K(ret)); } else { - orc_batch_ = row_reader_->createRowBatch(capacity); - if (!orc_batch_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("create orc row batch failed", K(ret)); - } else if (row_reader_->next(*orc_batch_)) { - //ok - } else { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("read next batch failed", K(ret), K(state_.cur_stripe_read_row_count_), K(state_.cur_stripe_row_count_)); + try { + orc_batch_ = row_reader_->createRowBatch(capacity); + if (!orc_batch_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("create orc row batch failed", K(ret)); + } else if (row_reader_->next(*orc_batch_)) { + //ok + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read next batch failed", K(ret), K(state_.cur_stripe_read_row_count_), K(state_.cur_stripe_row_count_)); + } + } catch(const std::exception& e) { + if (OB_SUCC(ret)) { + ret = OB_ORC_READ_ERROR; + LOG_USER_ERROR(OB_ORC_READ_ERROR, e.what()); + LOG_WARN("unexpected error", K(ret), "Info", e.what()); + } + } catch(...) { + if (OB_SUCC(ret)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected error", K(ret)); + } } + //load vec data from parquet file to file column expr for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) { if (OB_ISNULL(file_column_exprs_.at(i))) { @@ -605,15 +624,18 @@ int ObOrcTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } else { int idx = -1; int64_t col_id = -1; + const orc::Type *col_type = nullptr; ObDataAccessPathExtraInfo *data_access_info = static_cast(file_column_exprs_.at(i)->extra_info_); CK (data_access_info != nullptr); CK (data_access_info->data_access_path_.ptr() != nullptr); CK (data_access_info->data_access_path_.length() != 0); OZ (name_to_id_.get_refactored(ObString(data_access_info->data_access_path_.length(), data_access_info->data_access_path_.ptr()), col_id)); + + OZ (id_to_type_.get_refactored(col_id, col_type)); ObArray idxs; OZ (get_data_column_batch_idxs(&row_reader_->getSelectedType(), col_id, idxs)); - DataLoader loader(eval_ctx, file_column_exprs_.at(i), orc_batch_, capacity, idxs, read_count); + DataLoader loader(eval_ctx, file_column_exprs_.at(i), orc_batch_, capacity, idxs, read_count, col_type); OZ (file_column_exprs_.at(i)->init_vector_for_write( eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); OZ (loader.load_data_for_col(load_funcs_.at(i))); @@ -785,6 +807,72 @@ int ObOrcTableRowIterator::DataLoader::load_int64_vec() return ret; } +int ObOrcTableRowIterator::DataLoader::load_year_vec() +{ + int ret = OB_SUCCESS; + int64_t values_cnt = 0; + ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_); + CK (OB_NOT_NULL(file_col_expr_)); + if (OB_SUCC(ret)) { + ObFixedLengthBase *int32_vec = static_cast(file_col_expr_->get_vector(eval_ctx_)); + CK (OB_NOT_NULL(int32_vec)); + CK (VEC_FIXED == int32_vec->get_format()); + if (OB_SUCC(ret)) { + if (OB_ISNULL(batch_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("read orc next batch failed", K(ret)); + } else { + row_count_ = batch_->numElements; + orc::StructVectorBatch *root = dynamic_cast(batch_.get()); + if (OB_ISNULL(root)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc column vector batch failed", K(ret)); + } + CK (root->fields.size() > 0); + CK (idxs_.count() > 0); + if (OB_SUCC(ret)) { + orc::StructVectorBatch *cb = root; + for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) { + CK (root->fields.size() > idxs_.at(i)); + if (OB_SUCC(ret)) { + cb = dynamic_cast(cb->fields[idxs_.at(i)]); + CK (cb != nullptr); + } + } + } + if (OB_SUCC(ret)) { + orc::LongVectorBatch *long_batch = dynamic_cast(root->fields[idxs_.at(idxs_.count() - 1)]); + if (OB_ISNULL(long_batch)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dynamic cast orc type failed", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) { + if (long_batch->hasNulls) { + CK (OB_NOT_NULL(long_batch->notNull.data())); + if (OB_SUCC(ret)) { + const uint8_t* valid_bytes = reinterpret_cast(long_batch->notNull.data()) + i; + if (OB_ISNULL(valid_bytes)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("orc not null batch valid bytes is null", K(ret)); + } else if (*valid_bytes == 1) { + int32_vec->set_year(i, long_batch->data[i]); + } else { + int32_vec->set_null(i); + } + } + } else { + int32_vec->set_year(i, long_batch->data[i]); + } + } + } + } + } + } + } + LOG_DEBUG("load year vec", K(ret), K(row_count_)); + return ret; +} + int ObOrcTableRowIterator::DataLoader::load_int32_vec() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/table/ob_orc_table_row_iter.h b/src/sql/engine/table/ob_orc_table_row_iter.h index ecd17ffa5..171715915 100644 --- a/src/sql/engine/table/ob_orc_table_row_iter.h +++ b/src/sql/engine/table/ob_orc_table_row_iter.h @@ -128,19 +128,22 @@ private: std::unique_ptr &batch, const int64_t batch_size, const ObIArray &idxs, - int64_t &row_count): + int64_t &row_count, + const orc::Type *col_type): eval_ctx_(eval_ctx), file_col_expr_(file_col_expr), batch_(batch), batch_size_(batch_size), idxs_(idxs), - row_count_(row_count) + row_count_(row_count), + col_type_(col_type) {} typedef int (DataLoader::*LOAD_FUNC)(); static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type, const orc::Type &type); int load_data_for_col(LOAD_FUNC &func); int load_string_col(); + int load_year_vec(); int load_int32_vec(); int load_int64_vec(); int load_timestamp_vec(); @@ -150,7 +153,7 @@ private: int load_dec128_vec(); int load_dec64_vec(); - bool is_orc_read_utc(); + bool is_orc_read_utc(const orc::Type *type); bool is_ob_type_store_utc(const ObDatumMeta &meta); int64_t calc_tz_adjust_us(); @@ -160,6 +163,7 @@ private: const int64_t batch_size_; const ObIArray &idxs_; int64_t &row_count_; + const orc::Type *col_type_; }; private: int next_file();