From 1dfd6e83266d9ad278bb50517487f7473e99e09f Mon Sep 17 00:00:00 2001 From: wjhh2008 Date: Wed, 18 Sep 2024 06:20:48 +0000 Subject: [PATCH] fix parquet memory allocate bug --- .../table/ob_parquet_table_row_iter.cpp | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.cpp b/src/sql/engine/table/ob_parquet_table_row_iter.cpp index 360ac10fe..322d46a8c 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.cpp +++ b/src/sql/engine/table/ob_parquet_table_row_iter.cpp @@ -57,7 +57,7 @@ arrow::Status ObArrowMemPool::Reallocate(int64_t old_size, int64_t new_size, uin uint8_t* old = *ptr; arrow::Status status_ret = Allocate(new_size, ptr); if (arrow::Status::OK() == status_ret) { - MEMCPY(ptr, old, old_size); + MEMCPY(*ptr, old, std::min(old_size, new_size)); Free(old, old_size); } LOG_DEBUG("ObArrowMemPool::Reallocate", K(old_size), K(new_size), "stack", lbt()); @@ -1313,20 +1313,28 @@ int ObParquetTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) if (!file_column_exprs_.count()) { read_count = std::min(capacity, state_.cur_row_group_row_count_ - state_.cur_row_group_read_row_count_); } else { - //load vec data from parquet file to file column expr - for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) { - if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("page end unexpected", K(ret)); - } - if (OB_SUCC(ret)) { - DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(), - def_levels_buf_, rep_levels_buf_, capacity, read_count); - OZ (file_column_exprs_.at(i)->init_vector_for_write( - eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); - OZ (loader.load_data_for_col(load_funcs_.at(i))); - file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx); + try { + //load vec data from parquet file to file column expr + for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) { + if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("page end unexpected", K(ret)); + } + if (OB_SUCC(ret)) { + DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(), + def_levels_buf_, rep_levels_buf_, capacity, read_count); + OZ (file_column_exprs_.at(i)->init_vector_for_write( + eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); + OZ (loader.load_data_for_col(load_funcs_.at(i))); + file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx); + } } + } catch(const std::exception& e) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret), "Info", e.what()); + } catch(...) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected index", K(ret)); } } if (OB_SUCC(ret) && read_count > 0) {