fix parquet file itertor read batch bug

This commit is contained in:
wjhh2008 2024-11-07 03:44:01 +00:00 committed by ob-robot
parent f0b15b95b2
commit 8ccbe18a84
2 changed files with 101 additions and 55 deletions

View File

@ -203,6 +203,7 @@ int ObParquetTableRowIterator::init(const storage::ObTableScanParam *scan_param)
ObEvalCtx &eval_ctx = scan_param->op_->get_eval_ctx();
mem_attr_ = ObMemAttr(MTL_ID(), "ParquetRowIter");
allocator_.set_attr(mem_attr_);
str_res_mem_.set_attr(mem_attr_);
arrow_alloc_.init(MTL_ID());
OZ (ObExternalTableRowIterator::init(scan_param));
OZ (data_access_driver_.init(scan_param->external_file_location_,
@ -595,13 +596,13 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int32_vec()
values.get_data(), &values_cnt);
int j = 0;
if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) {
MEMCPY(pointer_cast<int32_t*>(dec_vec->get_data()), values.get_data(), sizeof(int32_t) * row_count_);
MEMCPY(pointer_cast<int32_t*>(dec_vec->get_data()) + row_offset_, values.get_data(), sizeof(int32_t) * row_count_);
} else {
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
dec_vec->set_int32(i, values.at(j++));
dec_vec->set_int32(i + row_offset_, values.at(j++));
}
}
}
@ -771,9 +772,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_);
} else {
OZ (to_numeric(i, values.at(j++)));
OZ (to_numeric(i + row_offset_, values.at(j++)));
}
}
} else if (reader_->descr()->physical_type() == parquet::Type::type::INT64) {
@ -785,9 +786,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_);
} else {
OZ (to_numeric(i, values.at(j++)));
OZ (to_numeric(i + row_offset_, values.at(j++)));
}
}
} else if (reader_->descr()->physical_type() == parquet::Type::Type::FIXED_LEN_BYTE_ARRAY) {
@ -803,10 +804,10 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_);
} else {
parquet::FixedLenByteArray &cur_v = values.at(j++);
OZ (to_numeric_hive(i, pointer_cast<const char*>(cur_v.ptr), fixed_length, buffer.get_data(), buffer.count()));
OZ (to_numeric_hive(i + row_offset_, pointer_cast<const char*>(cur_v.ptr), fixed_length, buffer.get_data(), buffer.count()));
//OZ (to_numeric(i, pointer_cast<const char*>(cur_v.ptr), fixed_length));
}
}
@ -822,10 +823,10 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_);
} else {
parquet::ByteArray &cur_v = values.at(j++);
OZ (to_numeric_hive(i, pointer_cast<const char*>(cur_v.ptr), cur_v.len, buffer.get_data(), buffer.count()));
OZ (to_numeric_hive(i + row_offset_, pointer_cast<const char*>(cur_v.ptr), cur_v.len, buffer.get_data(), buffer.count()));
//OZ (to_numeric(i, pointer_cast<const char*>(cur_v.ptr), cur_v.len));
}
}
@ -859,16 +860,30 @@ int ObParquetTableRowIterator::DataLoader::load_fixed_string_col()
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
text_vec->set_null(i);
text_vec->set_null(i + row_offset_);
} else {
void *res_ptr = NULL;
parquet::FixedLenByteArray &cur_v = values.at(j++);
text_vec->set_string(i, pointer_cast<const char *>(cur_v.ptr), fixed_length);
if (OB_UNLIKELY(fixed_length > file_col_expr_->max_length_
&& (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN,
pointer_cast<const char *>(cur_v.ptr),
fixed_length) > file_col_expr_->max_length_))) {
ret = OB_ERR_DATA_TOO_LONG;
LOG_WARN("data too long", K(ret));
} else {
if (row_count_ == batch_size_) {
res_ptr = (void*)(cur_v.ptr);
} else {
//when row_count_ less than batch_size_, it may reach page end and reload next page
//string values need deep copy
if (OB_ISNULL(res_ptr = str_res_mem_.alloc(fixed_length))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(fixed_length));
} else {
MEMCPY(res_ptr, cur_v.ptr, fixed_length);
}
}
text_vec->set_string(i + row_offset_, pointer_cast<const char *>(res_ptr), fixed_length);
}
}
}
@ -902,19 +917,33 @@ int ObParquetTableRowIterator::DataLoader::load_string_col()
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
text_vec->set_null(i);
text_vec->set_null(i + row_offset_);
} else {
parquet::ByteArray &cur_v = values.at(j++);
if (is_oracle_mode && 0 == cur_v.len) {
text_vec->set_null(i);
text_vec->set_null(i + row_offset_);
} else {
text_vec->set_string(i, pointer_cast<const char *>(cur_v.ptr), cur_v.len);
void *res_ptr = NULL;
if (OB_UNLIKELY(cur_v.len > file_col_expr_->max_length_
&& (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN,
pointer_cast<const char *>(cur_v.ptr),
cur_v.len) > file_col_expr_->max_length_))) {
ret = OB_ERR_DATA_TOO_LONG;
LOG_WARN("data too long", K(ret));
} else {
if (row_count_ == batch_size_) {
res_ptr = (void *)(cur_v.ptr);
} else {
//when row_count_ less than batch_size_, it may reach page end and reload next page
//string values need deep copy
if (OB_ISNULL(res_ptr = str_res_mem_.alloc(cur_v.len))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", K(cur_v.len));
} else {
MEMCPY(res_ptr, cur_v.ptr, cur_v.len);
}
}
text_vec->set_string(i + row_offset_, pointer_cast<const char *>(res_ptr), cur_v.len);
}
}
}
@ -946,9 +975,9 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec()
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
int32_vec->set_null(i);
int32_vec->set_null(i + row_offset_);
} else {
int32_vec->set_int(i, values.at(j++));
int32_vec->set_int(i + row_offset_, values.at(j++));
}
}
}
@ -975,14 +1004,14 @@ int ObParquetTableRowIterator::DataLoader::load_int64_to_int64_vec()
ret = OB_NOT_SUPPORTED;
LOG_WARN("repeated data not support");
} else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) {
MEMCPY(pointer_cast<int64_t*>(int64_vec->get_data()), values.get_data(), sizeof(int64_t) * row_count_);
MEMCPY(pointer_cast<int64_t*>(int64_vec->get_data()) + row_count_, values.get_data(), sizeof(int64_t) * row_count_);
} else {
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
int64_vec->set_null(i);
int64_vec->set_null(i + row_offset_);
} else {
int64_vec->set_int(i, values.at(j++));
int64_vec->set_int(i + row_offset_, values.at(j++));
}
}
}
@ -1007,9 +1036,9 @@ int ObParquetTableRowIterator::DataLoader::load_date_col_to_datetime()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
dec_vec->set_datetime(i, values.at(j++) * USECS_PER_DAY);
dec_vec->set_datetime(i + row_offset_, values.at(j++) * USECS_PER_DAY);
}
}
}
@ -1032,9 +1061,9 @@ int ObParquetTableRowIterator::DataLoader::load_time_millis_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
dec_vec->set_time(i, values.at(j++) * USECS_PER_MSEC);
dec_vec->set_time(i + row_offset_, values.at(j++) * USECS_PER_MSEC);
}
}
}
@ -1057,9 +1086,9 @@ int ObParquetTableRowIterator::DataLoader::load_time_nanos_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
dec_vec->set_time(i, values.at(j++) / NSECS_PER_USEC);
dec_vec->set_time(i + row_offset_, values.at(j++) / NSECS_PER_USEC);
}
}
}
@ -1111,15 +1140,15 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_millis_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
int64_t adjusted_value = values.at(j++) * USECS_PER_MSEC + adjust_us;
if (ObTimestampType == file_col_expr_->datum_meta_.type_) {
dec_vec->set_timestamp(i, adjusted_value);
dec_vec->set_timestamp(i + row_offset_, adjusted_value);
} else {
ObOTimestampData data;
data.time_us_ = adjusted_value;
dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data));
dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data));
}
}
}
@ -1144,15 +1173,15 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_micros_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
int64_t adjusted_value = (values.at(j++) + adjust_us);
if (ObTimestampType == file_col_expr_->datum_meta_.type_) {
dec_vec->set_timestamp(i, adjusted_value);
dec_vec->set_timestamp(i + row_offset_, adjusted_value);
} else {
ObOTimestampData data;
data.time_us_ = adjusted_value;
dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data));
dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data));
}
}
}
@ -1177,16 +1206,16 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_nanos_col()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
if (ObTimestampType == file_col_expr_->datum_meta_.type_) {
dec_vec->set_timestamp(i, values.at(j++) / NSECS_PER_USEC + adjust_us);
dec_vec->set_timestamp(i + row_offset_, values.at(j++) / NSECS_PER_USEC + adjust_us);
} else {
ObOTimestampData data;
int64_t cur_value = values.at(j++);
data.time_us_ = cur_value / NSECS_PER_USEC + adjust_us;
data.time_ctx_.set_tail_nsec(cur_value % NSECS_PER_USEC);
dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data));
dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data));
}
}
}
@ -1211,19 +1240,19 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_hive()
int j = 0;
for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
file_col_expr_->get_vector(eval_ctx_)->set_null(i);
dec_vec->set_null(i + row_offset_);
} else {
parquet::Int96 &value = values.at(j++);
uint64_t nsec_time_value = ((uint64_t)value.value[1] << 32) + (uint64_t)value.value[0];
uint32_t julian_date_value = value.value[2];
int64_t utc_timestamp =((int64_t)julian_date_value - 2440588LL) * 86400000000LL + (int64_t)(nsec_time_value / NSECS_PER_USEC);
if (ObTimestampType == file_col_expr_->datum_meta_.type_) {
dec_vec->set_timestamp(i, utc_timestamp + adjust_us);
dec_vec->set_timestamp(i + row_offset_, utc_timestamp + adjust_us);
} else {
ObOTimestampData data;
data.time_us_ = utc_timestamp + adjust_us;
data.time_ctx_.set_tail_nsec((int32_t)(nsec_time_value % NSECS_PER_USEC));
dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data));
dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data));
}
}
}
@ -1250,14 +1279,14 @@ int ObParquetTableRowIterator::DataLoader::load_float()
ret = OB_NOT_SUPPORTED;
LOG_WARN("repeated data not support");
} else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) {
MEMCPY(pointer_cast<float*>(float_vec->get_data()), values.get_data(), sizeof(float) * row_count_);
MEMCPY(pointer_cast<float*>(float_vec->get_data()) + row_offset_, values.get_data(), sizeof(float) * row_count_);
} else {
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
float_vec->set_null(i);
float_vec->set_null(i + row_offset_);
} else {
float_vec->set_float(i, values.at(j++));
float_vec->set_float(i + row_offset_, values.at(j++));
}
}
}
@ -1284,14 +1313,14 @@ int ObParquetTableRowIterator::DataLoader::load_double()
ret = OB_NOT_SUPPORTED;
LOG_WARN("repeated data not support");
} else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) {
MEMCPY(pointer_cast<double*>(double_vec->get_data()), values.get_data(), sizeof(double) * row_count_);
MEMCPY(pointer_cast<double*>(double_vec->get_data()) + row_offset_, values.get_data(), sizeof(double) * row_count_);
} else {
int j = 0;
for (int i = 0; i < row_count_; i++) {
if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) {
double_vec->set_null(i);
double_vec->set_null(i + row_offset_);
} else {
double_vec->set_double(i, values.at(j++));
double_vec->set_double(i + row_offset_, values.at(j++));
}
}
}
@ -1321,23 +1350,33 @@ int ObParquetTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
if (!file_column_exprs_.count()) {
read_count = std::min(capacity, state_.cur_row_group_row_count_ - state_.cur_row_group_read_row_count_);
} else {
str_res_mem_.reuse();
try {
//load vec data from parquet file to file column expr
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("page end unexpected", K(ret));
}
if (OB_SUCC(ret)) {
int64_t load_row_count = 0;
OZ (file_column_exprs_.at(i)->init_vector_for_write(
eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_));
while (OB_SUCC(ret) && load_row_count < capacity && column_readers_.at(i).get()->HasNext()) {
int64_t temp_row_count = 0;
DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(),
def_levels_buf_, rep_levels_buf_, capacity, read_count);
def_levels_buf_, rep_levels_buf_, str_res_mem_,
capacity - load_row_count, load_row_count, temp_row_count);
MEMSET(def_levels_buf_.get_data(), 0, sizeof(def_levels_buf_.at(0)) * eval_ctx.max_batch_size_);
MEMSET(rep_levels_buf_.get_data(), 0, sizeof(rep_levels_buf_.at(0)) * eval_ctx.max_batch_size_);
OZ (file_column_exprs_.at(i)->init_vector_for_write(
eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_));
OZ (loader.load_data_for_col(load_funcs_.at(i)));
file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx);
load_row_count += temp_row_count;
}
if (OB_SUCC(ret)) {
if (0 == read_count) {
read_count = load_row_count;
} else {
if (read_count != load_row_count) {
ret = OB_ERR_UNEXPECTED;
}
}
}
file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx);
}
} catch(const std::exception& e) {
ret = OB_ERR_UNEXPECTED;

View File

@ -156,15 +156,19 @@ private:
parquet::ColumnReader *reader,
common::ObIArrayWrap<int16_t> &def_levels_buf,
common::ObIArrayWrap<int16_t> &rep_levels_buf,
common::ObIAllocator &str_res_mem,
const int64_t batch_size,
const int64_t row_offset,
int64_t &row_count):
eval_ctx_(eval_ctx),
file_col_expr_(file_col_expr),
reader_(reader),
batch_size_(batch_size),
row_offset_(row_offset),
row_count_(row_count),
def_levels_buf_(def_levels_buf),
rep_levels_buf_(rep_levels_buf)
rep_levels_buf_(rep_levels_buf),
str_res_mem_(str_res_mem)
{}
typedef int (DataLoader::*LOAD_FUNC)();
static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type,
@ -202,9 +206,11 @@ private:
ObExpr *file_col_expr_;
parquet::ColumnReader *reader_;
const int64_t batch_size_;
const int64_t row_offset_;
int64_t &row_count_;
common::ObIArrayWrap<int16_t> &def_levels_buf_;
common::ObIArrayWrap<int16_t> &rep_levels_buf_;
common::ObIAllocator &str_res_mem_;
};
private:
int next_file();
@ -215,6 +221,7 @@ private:
StateValues state_;
lib::ObMemAttr mem_attr_;
ObArenaAllocator allocator_;
ObArenaAllocator str_res_mem_;
ObArrowMemPool arrow_alloc_;
parquet::ReaderProperties read_props_;
ObExternalDataAccessDriver data_access_driver_;