fix parquet memory allocate bug

This commit is contained in:
wjhh2008 2024-09-18 06:20:48 +00:00 committed by ob-robot
parent ca22852cc9
commit 1dfd6e8326

View File

@ -57,7 +57,7 @@ arrow::Status ObArrowMemPool::Reallocate(int64_t old_size, int64_t new_size, uin
uint8_t* old = *ptr;
arrow::Status status_ret = Allocate(new_size, ptr);
if (arrow::Status::OK() == status_ret) {
MEMCPY(ptr, old, old_size);
MEMCPY(*ptr, old, std::min(old_size, new_size));
Free(old, old_size);
}
LOG_DEBUG("ObArrowMemPool::Reallocate", K(old_size), K(new_size), "stack", lbt());
@ -1313,20 +1313,28 @@ int ObParquetTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
if (!file_column_exprs_.count()) {
read_count = std::min(capacity, state_.cur_row_group_row_count_ - state_.cur_row_group_read_row_count_);
} else {
//load vec data from parquet file to file column expr
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("page end unexpected", K(ret));
}
if (OB_SUCC(ret)) {
DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(),
def_levels_buf_, rep_levels_buf_, capacity, read_count);
OZ (file_column_exprs_.at(i)->init_vector_for_write(
eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_));
OZ (loader.load_data_for_col(load_funcs_.at(i)));
file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx);
try {
//load vec data from parquet file to file column expr
for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) {
if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("page end unexpected", K(ret));
}
if (OB_SUCC(ret)) {
DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(),
def_levels_buf_, rep_levels_buf_, capacity, read_count);
OZ (file_column_exprs_.at(i)->init_vector_for_write(
eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_));
OZ (loader.load_data_for_col(load_funcs_.at(i)));
file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx);
}
}
} catch(const std::exception& e) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret), "Info", e.what());
} catch(...) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected index", K(ret));
}
}
if (OB_SUCC(ret) && read_count > 0) {