[orc] fix some bugs of reading orc file
This commit is contained in:
parent
a6eeca581e
commit
e6034575d0
@ -425,9 +425,9 @@ ObLLVMDIHelper::ObDIBasicTypeAttr ObLLVMDIHelper::basic_type_[common::ObMaxType]
|
||||
{"null", 0, 0, 0},
|
||||
{"tinyint", 8, 8, llvm::dwarf::DW_ATE_signed},
|
||||
{"smallint", 16, 16, llvm::dwarf::DW_ATE_signed},
|
||||
{"mediumint", 32, 32, llvm::dwarf::DW_ATE_signed},
|
||||
{"int", 32, 32, llvm::dwarf::DW_ATE_signed},
|
||||
{"bigint", 64, 64, llvm::dwarf::DW_ATE_signed},
|
||||
{"mediumint", 32, 32, llvm::dwarf::DW_ATE_signed},
|
||||
{"int", 32, 32, llvm::dwarf::DW_ATE_signed},
|
||||
{"bigint", 64, 64, llvm::dwarf::DW_ATE_signed},
|
||||
{"tinyint unsigned", 8, 8, llvm::dwarf::DW_ATE_unsigned},
|
||||
{"smallint unsigned", 16, 16, llvm::dwarf::DW_ATE_unsigned},
|
||||
{"mediumint unsigned", 32, 32, llvm::dwarf::DW_ATE_unsigned},
|
||||
|
@ -373,7 +373,8 @@ int ObOrcTableRowIterator::next_file()
|
||||
}
|
||||
} catch(const std::exception& e) {
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ret = OB_INVALID_EXTERNAL_FILE;
|
||||
LOG_USER_ERROR(OB_INVALID_EXTERNAL_FILE, e.what());
|
||||
LOG_WARN("unexpected error", K(ret), "Info", e.what());
|
||||
}
|
||||
} catch(...) {
|
||||
@ -387,23 +388,23 @@ int ObOrcTableRowIterator::next_file()
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObOrcTableRowIterator::DataLoader::is_orc_read_utc()
|
||||
bool ObOrcTableRowIterator::DataLoader::is_orc_read_utc(const orc::Type *type)
|
||||
{
|
||||
return true;
|
||||
// TIMESTAMP_INSTANT 是utc时间
|
||||
return type->getKind() == orc::TypeKind::TIMESTAMP_INSTANT;
|
||||
}
|
||||
|
||||
bool ObOrcTableRowIterator::DataLoader::is_ob_type_store_utc(const ObDatumMeta &meta)
|
||||
{
|
||||
return ObTimestampType == meta.type_ || ObDateType == meta.type_
|
||||
|| (lib::is_mysql_mode() && ObDateTimeType == meta.type_)
|
||||
|| (lib::is_mysql_mode() && ObTimeType == meta.type_);
|
||||
return (lib::is_mysql_mode() && ObTimestampType == meta.type_)
|
||||
|| (lib::is_oracle_mode() && ObTimestampLTZType == meta.type_);
|
||||
}
|
||||
|
||||
int64_t ObOrcTableRowIterator::DataLoader::calc_tz_adjust_us()
|
||||
{
|
||||
int64_t res = 0;
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_utc_src = is_orc_read_utc();
|
||||
bool is_utc_src = is_orc_read_utc(col_type_);
|
||||
bool is_utc_dst = is_ob_type_store_utc(file_col_expr_->datum_meta_);
|
||||
if (is_utc_src != is_utc_dst) {
|
||||
int32_t tmp_offset = 0;
|
||||
@ -463,6 +464,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader::
|
||||
if (temp_obj.get_tight_data_len() < orc_data_len) {
|
||||
func = NULL;
|
||||
}
|
||||
} else if (ob_is_year_tc(datum_type.type_) && orc::TypeKind::INT == type_kind) {
|
||||
func = &DataLoader::load_year_vec;
|
||||
} else if (ob_is_string_tc(datum_type.type_) || ob_is_enum_or_set_type(datum_type.type_)) {
|
||||
//convert orc enum/string to enum/string vector
|
||||
switch (type_kind) {
|
||||
@ -482,7 +485,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader::
|
||||
} else if (ob_is_number_or_decimal_int_tc(datum_type.type_)) {
|
||||
//convert orc int storing as int32/int64 to number/decimal vector
|
||||
if (type_kind == orc::TypeKind::DECIMAL) {
|
||||
if (col_desc->getPrecision() != datum_type.precision_ || col_desc->getScale() != datum_type.scale_) {
|
||||
if (col_desc->getPrecision() != ((datum_type.precision_ == -1) ? 38 : datum_type.precision_)
|
||||
|| col_desc->getScale() != datum_type.scale_) {
|
||||
func = NULL;
|
||||
} else if (col_desc->getPrecision() == 0 || col_desc->getPrecision() > 18) {
|
||||
func = &DataLoader::load_dec128_vec;
|
||||
@ -514,7 +518,8 @@ ObOrcTableRowIterator::DataLoader::LOAD_FUNC ObOrcTableRowIterator::DataLoader::
|
||||
ob_is_datetime(datum_type.type_) ||
|
||||
ob_is_time_tc(datum_type.type_) ||
|
||||
ObTimestampType == datum_type.type_ ||
|
||||
ObTimestampLTZType == datum_type.type_) {
|
||||
ObTimestampLTZType == datum_type.type_ ||
|
||||
ObTimestampNanoType == datum_type.type_) {
|
||||
func = &DataLoader::load_timestamp_vec;
|
||||
}
|
||||
break;
|
||||
@ -587,16 +592,30 @@ int ObOrcTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("row reader is null", K(ret));
|
||||
} else {
|
||||
orc_batch_ = row_reader_->createRowBatch(capacity);
|
||||
if (!orc_batch_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("create orc row batch failed", K(ret));
|
||||
} else if (row_reader_->next(*orc_batch_)) {
|
||||
//ok
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("read next batch failed", K(ret), K(state_.cur_stripe_read_row_count_), K(state_.cur_stripe_row_count_));
|
||||
try {
|
||||
orc_batch_ = row_reader_->createRowBatch(capacity);
|
||||
if (!orc_batch_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("create orc row batch failed", K(ret));
|
||||
} else if (row_reader_->next(*orc_batch_)) {
|
||||
//ok
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("read next batch failed", K(ret), K(state_.cur_stripe_read_row_count_), K(state_.cur_stripe_row_count_));
|
||||
}
|
||||
} catch(const std::exception& e) {
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_ORC_READ_ERROR;
|
||||
LOG_USER_ERROR(OB_ORC_READ_ERROR, e.what());
|
||||
LOG_WARN("unexpected error", K(ret), "Info", e.what());
|
||||
}
|
||||
} catch(...) {
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
//load vec data from parquet file to file column expr
|
||||
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
|
||||
if (OB_ISNULL(file_column_exprs_.at(i))) {
|
||||
@ -605,15 +624,18 @@ int ObOrcTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
|
||||
} else {
|
||||
int idx = -1;
|
||||
int64_t col_id = -1;
|
||||
const orc::Type *col_type = nullptr;
|
||||
ObDataAccessPathExtraInfo *data_access_info =
|
||||
static_cast<ObDataAccessPathExtraInfo *>(file_column_exprs_.at(i)->extra_info_);
|
||||
CK (data_access_info != nullptr);
|
||||
CK (data_access_info->data_access_path_.ptr() != nullptr);
|
||||
CK (data_access_info->data_access_path_.length() != 0);
|
||||
OZ (name_to_id_.get_refactored(ObString(data_access_info->data_access_path_.length(), data_access_info->data_access_path_.ptr()), col_id));
|
||||
|
||||
OZ (id_to_type_.get_refactored(col_id, col_type));
|
||||
ObArray<int> idxs;
|
||||
OZ (get_data_column_batch_idxs(&row_reader_->getSelectedType(), col_id, idxs));
|
||||
DataLoader loader(eval_ctx, file_column_exprs_.at(i), orc_batch_, capacity, idxs, read_count);
|
||||
DataLoader loader(eval_ctx, file_column_exprs_.at(i), orc_batch_, capacity, idxs, read_count, col_type);
|
||||
OZ (file_column_exprs_.at(i)->init_vector_for_write(
|
||||
eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_));
|
||||
OZ (loader.load_data_for_col(load_funcs_.at(i)));
|
||||
@ -785,6 +807,72 @@ int ObOrcTableRowIterator::DataLoader::load_int64_vec()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObOrcTableRowIterator::DataLoader::load_year_vec()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t values_cnt = 0;
|
||||
ObEvalCtx::TempAllocGuard tmp_alloc_g(eval_ctx_);
|
||||
CK (OB_NOT_NULL(file_col_expr_));
|
||||
if (OB_SUCC(ret)) {
|
||||
ObFixedLengthBase *int32_vec = static_cast<ObFixedLengthBase *>(file_col_expr_->get_vector(eval_ctx_));
|
||||
CK (OB_NOT_NULL(int32_vec));
|
||||
CK (VEC_FIXED == int32_vec->get_format());
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(batch_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("read orc next batch failed", K(ret));
|
||||
} else {
|
||||
row_count_ = batch_->numElements;
|
||||
orc::StructVectorBatch *root = dynamic_cast<orc::StructVectorBatch *>(batch_.get());
|
||||
if (OB_ISNULL(root)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("dynamic cast orc column vector batch failed", K(ret));
|
||||
}
|
||||
CK (root->fields.size() > 0);
|
||||
CK (idxs_.count() > 0);
|
||||
if (OB_SUCC(ret)) {
|
||||
orc::StructVectorBatch *cb = root;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < idxs_.count() - 1; i++) {
|
||||
CK (root->fields.size() > idxs_.at(i));
|
||||
if (OB_SUCC(ret)) {
|
||||
cb = dynamic_cast<orc::StructVectorBatch *>(cb->fields[idxs_.at(i)]);
|
||||
CK (cb != nullptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
orc::LongVectorBatch *long_batch = dynamic_cast<orc::LongVectorBatch *>(root->fields[idxs_.at(idxs_.count() - 1)]);
|
||||
if (OB_ISNULL(long_batch)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("dynamic cast orc type failed", K(ret));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < row_count_; i++) {
|
||||
if (long_batch->hasNulls) {
|
||||
CK (OB_NOT_NULL(long_batch->notNull.data()));
|
||||
if (OB_SUCC(ret)) {
|
||||
const uint8_t* valid_bytes = reinterpret_cast<const uint8_t*>(long_batch->notNull.data()) + i;
|
||||
if (OB_ISNULL(valid_bytes)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("orc not null batch valid bytes is null", K(ret));
|
||||
} else if (*valid_bytes == 1) {
|
||||
int32_vec->set_year(i, long_batch->data[i]);
|
||||
} else {
|
||||
int32_vec->set_null(i);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int32_vec->set_year(i, long_batch->data[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_DEBUG("load year vec", K(ret), K(row_count_));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObOrcTableRowIterator::DataLoader::load_int32_vec()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -128,19 +128,22 @@ private:
|
||||
std::unique_ptr<orc::ColumnVectorBatch> &batch,
|
||||
const int64_t batch_size,
|
||||
const ObIArray<int> &idxs,
|
||||
int64_t &row_count):
|
||||
int64_t &row_count,
|
||||
const orc::Type *col_type):
|
||||
eval_ctx_(eval_ctx),
|
||||
file_col_expr_(file_col_expr),
|
||||
batch_(batch),
|
||||
batch_size_(batch_size),
|
||||
idxs_(idxs),
|
||||
row_count_(row_count)
|
||||
row_count_(row_count),
|
||||
col_type_(col_type)
|
||||
{}
|
||||
typedef int (DataLoader::*LOAD_FUNC)();
|
||||
static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type,
|
||||
const orc::Type &type);
|
||||
int load_data_for_col(LOAD_FUNC &func);
|
||||
int load_string_col();
|
||||
int load_year_vec();
|
||||
int load_int32_vec();
|
||||
int load_int64_vec();
|
||||
int load_timestamp_vec();
|
||||
@ -150,7 +153,7 @@ private:
|
||||
int load_dec128_vec();
|
||||
int load_dec64_vec();
|
||||
|
||||
bool is_orc_read_utc();
|
||||
bool is_orc_read_utc(const orc::Type *type);
|
||||
bool is_ob_type_store_utc(const ObDatumMeta &meta);
|
||||
|
||||
int64_t calc_tz_adjust_us();
|
||||
@ -160,6 +163,7 @@ private:
|
||||
const int64_t batch_size_;
|
||||
const ObIArray<int> &idxs_;
|
||||
int64_t &row_count_;
|
||||
const orc::Type *col_type_;
|
||||
};
|
||||
private:
|
||||
int next_file();
|
||||
|
Loading…
x
Reference in New Issue
Block a user