fix topn bug: comp not updated when new batch be added
This commit is contained in:
@ -224,8 +224,11 @@ protected:
|
||||
void set_blk_holder(ObTempRowStore::BlockHolder *blk_holder);
|
||||
// for topn sort
|
||||
int add_heap_sort_row(const Store_Row *&store_row);
|
||||
// load data to comp_
|
||||
int load_data_to_comp(const ObBatchRows &input_brs);
|
||||
int add_heap_sort_batch(const ObBatchRows &input_brs, const int64_t start_pos /* 0 */,
|
||||
int64_t *append_row_count = nullptr);
|
||||
int64_t *append_row_count = nullptr,
|
||||
bool need_load_data = true);
|
||||
int add_heap_sort_batch(const ObBatchRows &input_brs, const uint16_t selector[],
|
||||
const int64_t size);
|
||||
int adjust_topn_heap(const Store_Row *&store_row);
|
||||
|
||||
@ -353,11 +353,13 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::add_batch(const ObBatchRows
|
||||
SQL_ENG_LOG(WARN, "failed to do topn dump", K(ret));
|
||||
}
|
||||
}
|
||||
bool need_load_data = true;
|
||||
if (OB_SUCC(ret)) {
|
||||
const ObBatchRows *input_brs_ptr = nullptr;
|
||||
if (is_topn_sort() && OB_NOT_NULL(topn_filter_) && OB_LIKELY(!topn_filter_->is_by_pass())) {
|
||||
if (OB_FAIL(batch_eval_vector(all_exprs_, input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to eval vector", K(ret));
|
||||
need_load_data = false;
|
||||
if (OB_FAIL(load_data_to_comp(input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to load data", K(ret));
|
||||
} else if (OB_FAIL(topn_filter_->filter(all_exprs_, *eval_ctx_, start_pos,
|
||||
input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to do topn filter", K(ret));
|
||||
@ -369,7 +371,8 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::add_batch(const ObBatchRows
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (use_heap_sort_) {
|
||||
ret = add_heap_sort_batch(*input_brs_ptr, start_pos, append_row_count);
|
||||
ret = add_heap_sort_batch(*input_brs_ptr, start_pos, append_row_count,
|
||||
need_load_data);
|
||||
} else {
|
||||
ret = add_quick_sort_batch(*input_brs_ptr, start_pos, append_row_count);
|
||||
}
|
||||
@ -935,9 +938,24 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::batch_eval_vector(
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Compare, typename Store_Row, bool has_addon>
|
||||
int ObSortVecOpImpl<Compare, Store_Row, has_addon>::load_data_to_comp(const ObBatchRows &input_brs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(batch_eval_vector(all_exprs_, input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to eval vector", K(ret));
|
||||
} else if (OB_FAIL(sort_exprs_getter_.fetch_payload(input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to batch fetch sort key payload", K(ret));
|
||||
} else if(FALSE_IT(comp_.set_sort_key_col_result_list(
|
||||
sort_exprs_getter_.get_sk_col_result_list()))) {
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
template <typename Compare, typename Store_Row, bool has_addon>
|
||||
int ObSortVecOpImpl<Compare, Store_Row, has_addon>::add_heap_sort_batch(
|
||||
const ObBatchRows &input_brs, const int64_t start_pos /* 0 */, int64_t *append_row_count)
|
||||
const ObBatchRows &input_brs, const int64_t start_pos /* 0 */, int64_t *append_row_count,
|
||||
bool need_load_data)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t row_count = 0;
|
||||
@ -946,12 +964,8 @@ int ObSortVecOpImpl<Compare, Store_Row, has_addon>::add_heap_sort_batch(
|
||||
batch_info_guard.set_batch_size(input_brs.size_);
|
||||
if (OB_FAIL(update_max_available_mem_size_periodically())) {
|
||||
SQL_ENG_LOG(WARN, "failed to update max available mem size periodically", K(ret));
|
||||
} else if (OB_FAIL(batch_eval_vector(all_exprs_, input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to eval vector", K(ret));
|
||||
} else if (OB_FAIL(sort_exprs_getter_.fetch_payload(input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to batch fetch sort key payload", K(ret));
|
||||
} else if (FALSE_IT(comp_.set_sort_key_col_result_list(
|
||||
sort_exprs_getter_.get_sk_col_result_list()))) {
|
||||
} else if (need_load_data && OB_FAIL(load_data_to_comp(input_brs))) {
|
||||
SQL_ENG_LOG(WARN, "failed to load data", K(ret));
|
||||
} else {
|
||||
for (int64_t i = start_pos; OB_SUCC(ret) && i < input_brs.size_; i++) {
|
||||
if (input_brs.skip_->exist(i)) {
|
||||
|
||||
Reference in New Issue
Block a user