diff --git a/src/sql/engine/window_function/ob_window_function_op.cpp b/src/sql/engine/window_function/ob_window_function_op.cpp index d97439681f..8629d59610 100644 --- a/src/sql/engine/window_function/ob_window_function_op.cpp +++ b/src/sql/engine/window_function/ob_window_function_op.cpp @@ -3387,114 +3387,117 @@ int ObWindowFunctionOp::process_child_batch( ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_); guard.set_batch_size(child_brs->size_); guard.set_batch_idx(batch_idx); - //When find a big partition, store remaining rows of the batch to %remain row store - RowsStore ¤t = *input_rows_.cur_; - RowsStore &processed = *input_rows_.processed_; - RowsStore &remain = processed.is_empty() ? processed : current; - const bool need_split_store = processed.is_empty(); bool found_next_part = false; OZ(check_stack_overflow()); - while (OB_SUCC(ret) && !found_next_part) { - // handle current batch + while (OB_SUCC(ret) && !found_next_part) { // handle current batch const ObBitVector &skip = *child_brs->skip_; - while (OB_SUCC(ret) && row_idx < child_brs->size_ && !found_next_part) { - bool same_part = false; - guard.set_batch_idx(row_idx); - int64_t row_cnt_inc = 0; - if (skip.contain(row_idx)) { - ++row_idx; - continue; - } else if (OB_FAIL(check_same_partition(*first, same_part))) { - LOG_WARN("check same partition failed", K(ret)); - } else if (!same_part) { - if (OB_FAIL(check_wf_same_partition(end))) { - LOG_WARN("check wf same partition failed", K(ret)); - } else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) { - LOG_WARN("found_part_end failed", K(ret)); - } else if (end != wf_list_.get_header()) { - if (OB_FAIL(found_new_part(false))) { - LOG_WARN("save partition by exprs failed", K(ret)); - } else if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), - &eval_ctx_, NULL, false))) { - LOG_WARN("add row to rows_store failed", K(ret)); + bool need_loop_until_child_brs_end = true; + while (OB_SUCC(ret) && need_loop_until_child_brs_end) { + need_loop_until_child_brs_end = false; + found_next_part = false; + RowsStore ¤t = *input_rows_.cur_; + RowsStore &processed = *input_rows_.processed_; + // When find a big partition, store remaining rows of the batch to %remain row store + RowsStore &remain = processed.is_empty() ? processed : current; + const bool need_split_store = processed.is_empty(); + while (OB_SUCC(ret) && row_idx < child_brs->size_ && !found_next_part) { + bool same_part = false; + guard.set_batch_idx(row_idx); + int64_t row_cnt_inc = 0; + if (skip.contain(row_idx)) { + ++row_idx; + continue; + } else if (OB_FAIL(check_same_partition(*first, same_part))) { + LOG_WARN("check same partition failed", K(ret)); + } else if (!same_part) { + if (OB_FAIL(check_wf_same_partition(end))) { + LOG_WARN("check wf same partition failed", K(ret)); + } else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) { + LOG_WARN("found_part_end failed", K(ret)); + } else if (end != wf_list_.get_header()) { + if (OB_FAIL(found_new_part(false))) { + LOG_WARN("save partition by exprs failed", K(ret)); + } else if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), + &eval_ctx_, NULL, false))) { + LOG_WARN("add row to rows_store failed", K(ret)); + } else { + // the biggest part end : remain row_cnt_ + 0, store_row_cnt_ + 1 + // other parts end : after compute_wf_values, current row_cnt_ + 1, store_row_cnt_ + 1 + row_cnt_inc = 1; + } } else { - // the biggest part end : remain row_cnt_ + 0, store_row_cnt_ + 1 - // other parts end : after compute_wf_values, current row_cnt_ + 1, store_row_cnt_ + 1 - row_cnt_inc = 1; + found_next_part = true; + if (OB_FAIL(found_new_part(false))) { + LOG_WARN("save partition by exprs failed", K(ret)); + } else if (OB_FAIL(remain.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_, + NULL, false))) { + LOG_WARN("add row to rows_store failed", K(ret)); + } } - } else { - found_next_part = true; - if (OB_FAIL(found_new_part(false))) { + if (OB_FAIL(ret)) { + } else if (OB_FAIL(compute_wf_values(end, check_times))) { + LOG_WARN("compute wf values failed", K(ret)); + } else if (OB_FAIL(save_part_first_row_idx())) { LOG_WARN("save partition by exprs failed", K(ret)); - } else if (OB_FAIL(remain.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_, - NULL, false))) { + } else { + // compute_wf_values will use current.row_cnt to decide which rows need to compute + // so add row_cnt_inc to current.row_cnt after compute_wf_values + current.row_cnt_ += row_cnt_inc; + } + } else { // same part + if (MY_SPEC.is_participator()) { + MY_SPEC.wf_aggr_status_expr_->locate_datum_for_write(eval_ctx_).set_int(last_aggr_status_); + } + if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_))) { LOG_WARN("add row to rows_store failed", K(ret)); } } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(compute_wf_values(end, check_times))) { - LOG_WARN("compute wf values failed", K(ret)); - } else if (OB_FAIL(save_part_first_row_idx())) { - LOG_WARN("save partition by exprs failed", K(ret)); - } else { - // compute_wf_values will use current.row_cnt to decide which rows need to compute - // so add row_cnt_inc to current.row_cnt after compute_wf_values - current.row_cnt_ += row_cnt_inc; - } - } else { // same part - if (MY_SPEC.is_participator()) { - MY_SPEC.wf_aggr_status_expr_->locate_datum_for_write(eval_ctx_).set_int(last_aggr_status_); - } - if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_))) { - LOG_WARN("add row to rows_store failed", K(ret)); + if (OB_SUCC(ret) && !found_next_part) { + ++row_idx; } } - if (OB_SUCC(ret) && !found_next_part) { - ++row_idx; - } - } - - if (OB_SUCC(ret)) { - if (found_next_part) { - if (need_split_store) { - // switch %cur_ and %processed_ row store if necessary - // %cur_ always be rows store we are computing. - // And rows in %processed_ row store are all computed and ready to output. - // swap $cur_ and $processed_ row store - foreach_stores([](Stores &s){ std::swap(s.cur_, s.processed_); return OB_SUCCESS; }); - if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) { - first_part_saved_ = true; - foreach_stores([](Stores &s){ std::swap(s.first_, s.processed_); return OB_SUCCESS; }); + if (OB_SUCC(ret)) { + if (found_next_part) { + if (need_split_store) { + // switch %cur_ and %processed_ row store if necessary + // %cur_ always be rows store we are computing. + // And rows in %processed_ row store are all computed and ready to output. + // swap $cur_ and $processed_ row store + foreach_stores([](Stores &s){ std::swap(s.cur_, s.processed_); return OB_SUCCESS; }); + if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) { + first_part_saved_ = true; + foreach_stores([](Stores &s){ std::swap(s.first_, s.processed_); return OB_SUCCESS; }); + } } - } - ++row_idx; - ++remain.row_cnt_; - if (need_split_store && OB_FAIL(save_part_first_row_idx())) { - LOG_WARN("save partition by exprs failed", K(ret)); - } else if (OB_FAIL(process_child_batch(row_idx, child_brs, check_times))) { - LOG_WARN("add row to rows_store failed", K(ret)); - } - } else if (!child_iter_end_) { - if (need_split_store) { // need_split_store is true means we have not found next part ever - if (OB_FAIL(get_next_batch_from_child(batch_size, child_brs))) { - LOG_WARN("get child next batch failed", K(ret)); + ++row_idx; + ++remain.row_cnt_; + if (need_split_store && OB_FAIL(save_part_first_row_idx())) { + LOG_WARN("save partition by exprs failed", K(ret)); + } else { // need to deal with remain rows in this child_brs after found new part + need_loop_until_child_brs_end = true; + } + } else if (!child_iter_end_) { + if (need_split_store) { // need_split_store is true means we have not found next part ever + if (OB_FAIL(get_next_batch_from_child(batch_size, child_brs))) { + LOG_WARN("get child next batch failed", K(ret)); + } else { + child_iter_end_ = child_brs->end_; + row_idx = 0; + guard.set_batch_size(child_brs->size_); + } } else { - child_iter_end_ = child_brs->end_; - row_idx = 0; - guard.set_batch_size(child_brs->size_); + found_next_part = true; + current.row_cnt_ = wf_list_.get_last()->part_first_row_idx_; } - } else { + } else { // child_iter_end_ found_next_part = true; - current.row_cnt_ = wf_list_.get_last()->part_first_row_idx_; - } - } else { // child_iter_end_ - found_next_part = true; - // add aggr result row for the last part - if (OB_FAIL(found_part_end(wf_list_.get_header(), input_rows_.cur_))) { - LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_)); - } else if (OB_FAIL(compute_wf_values(wf_list_.get_header(), check_times))) { - LOG_WARN("compute wf values failed", K(ret)); + // add aggr result row for the last part + if (OB_FAIL(found_part_end(wf_list_.get_header(), input_rows_.cur_))) { + LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_)); + } else if (OB_FAIL(compute_wf_values(wf_list_.get_header(), check_times))) { + LOG_WARN("compute wf values failed", K(ret)); + } } } }