Fix a wf bug caused by the logic of recursion

This commit is contained in:
obdev
2023-04-03 21:11:30 +00:00
committed by ob-robot
parent 7da7fe8b1b
commit a615fb4f61

View File

@ -3387,114 +3387,117 @@ int ObWindowFunctionOp::process_child_batch(
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
guard.set_batch_size(child_brs->size_);
guard.set_batch_idx(batch_idx);
//When find a big partition, store remaining rows of the batch to %remain row store
RowsStore &current = *input_rows_.cur_;
RowsStore &processed = *input_rows_.processed_;
RowsStore &remain = processed.is_empty() ? processed : current;
const bool need_split_store = processed.is_empty();
bool found_next_part = false;
OZ(check_stack_overflow());
while (OB_SUCC(ret) && !found_next_part) {
// handle current batch
while (OB_SUCC(ret) && !found_next_part) { // handle current batch
const ObBitVector &skip = *child_brs->skip_;
while (OB_SUCC(ret) && row_idx < child_brs->size_ && !found_next_part) {
bool same_part = false;
guard.set_batch_idx(row_idx);
int64_t row_cnt_inc = 0;
if (skip.contain(row_idx)) {
++row_idx;
continue;
} else if (OB_FAIL(check_same_partition(*first, same_part))) {
LOG_WARN("check same partition failed", K(ret));
} else if (!same_part) {
if (OB_FAIL(check_wf_same_partition(end))) {
LOG_WARN("check wf same partition failed", K(ret));
} else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
LOG_WARN("found_part_end failed", K(ret));
} else if (end != wf_list_.get_header()) {
if (OB_FAIL(found_new_part(false))) {
LOG_WARN("save partition by exprs failed", K(ret));
} else if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(),
&eval_ctx_, NULL, false))) {
LOG_WARN("add row to rows_store failed", K(ret));
bool need_loop_until_child_brs_end = true;
while (OB_SUCC(ret) && need_loop_until_child_brs_end) {
need_loop_until_child_brs_end = false;
found_next_part = false;
RowsStore &current = *input_rows_.cur_;
RowsStore &processed = *input_rows_.processed_;
// When find a big partition, store remaining rows of the batch to %remain row store
RowsStore &remain = processed.is_empty() ? processed : current;
const bool need_split_store = processed.is_empty();
while (OB_SUCC(ret) && row_idx < child_brs->size_ && !found_next_part) {
bool same_part = false;
guard.set_batch_idx(row_idx);
int64_t row_cnt_inc = 0;
if (skip.contain(row_idx)) {
++row_idx;
continue;
} else if (OB_FAIL(check_same_partition(*first, same_part))) {
LOG_WARN("check same partition failed", K(ret));
} else if (!same_part) {
if (OB_FAIL(check_wf_same_partition(end))) {
LOG_WARN("check wf same partition failed", K(ret));
} else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
LOG_WARN("found_part_end failed", K(ret));
} else if (end != wf_list_.get_header()) {
if (OB_FAIL(found_new_part(false))) {
LOG_WARN("save partition by exprs failed", K(ret));
} else if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(),
&eval_ctx_, NULL, false))) {
LOG_WARN("add row to rows_store failed", K(ret));
} else {
// the biggest part end : remain row_cnt_ + 0, store_row_cnt_ + 1
// other parts end : after compute_wf_values, current row_cnt_ + 1, store_row_cnt_ + 1
row_cnt_inc = 1;
}
} else {
// the biggest part end : remain row_cnt_ + 0, store_row_cnt_ + 1
// other parts end : after compute_wf_values, current row_cnt_ + 1, store_row_cnt_ + 1
row_cnt_inc = 1;
found_next_part = true;
if (OB_FAIL(found_new_part(false))) {
LOG_WARN("save partition by exprs failed", K(ret));
} else if (OB_FAIL(remain.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_,
NULL, false))) {
LOG_WARN("add row to rows_store failed", K(ret));
}
}
} else {
found_next_part = true;
if (OB_FAIL(found_new_part(false))) {
if (OB_FAIL(ret)) {
} else if (OB_FAIL(compute_wf_values(end, check_times))) {
LOG_WARN("compute wf values failed", K(ret));
} else if (OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else if (OB_FAIL(remain.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_,
NULL, false))) {
} else {
// compute_wf_values will use current.row_cnt to decide which rows need to compute
// so add row_cnt_inc to current.row_cnt after compute_wf_values
current.row_cnt_ += row_cnt_inc;
}
} else { // same part
if (MY_SPEC.is_participator()) {
MY_SPEC.wf_aggr_status_expr_->locate_datum_for_write(eval_ctx_).set_int(last_aggr_status_);
}
if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_))) {
LOG_WARN("add row to rows_store failed", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(compute_wf_values(end, check_times))) {
LOG_WARN("compute wf values failed", K(ret));
} else if (OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else {
// compute_wf_values will use current.row_cnt to decide which rows need to compute
// so add row_cnt_inc to current.row_cnt after compute_wf_values
current.row_cnt_ += row_cnt_inc;
}
} else { // same part
if (MY_SPEC.is_participator()) {
MY_SPEC.wf_aggr_status_expr_->locate_datum_for_write(eval_ctx_).set_int(last_aggr_status_);
}
if (OB_FAIL(current.add_row_with_index(row_idx, get_all_expr(), &eval_ctx_))) {
LOG_WARN("add row to rows_store failed", K(ret));
if (OB_SUCC(ret) && !found_next_part) {
++row_idx;
}
}
if (OB_SUCC(ret) && !found_next_part) {
++row_idx;
}
}
if (OB_SUCC(ret)) {
if (found_next_part) {
if (need_split_store) {
// switch %cur_ and %processed_ row store if necessary
// %cur_ always be rows store we are computing.
// And rows in %processed_ row store are all computed and ready to output.
// swap $cur_ and $processed_ row store
foreach_stores([](Stores &s){ std::swap(s.cur_, s.processed_); return OB_SUCCESS; });
if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) {
first_part_saved_ = true;
foreach_stores([](Stores &s){ std::swap(s.first_, s.processed_); return OB_SUCCESS; });
if (OB_SUCC(ret)) {
if (found_next_part) {
if (need_split_store) {
// switch %cur_ and %processed_ row store if necessary
// %cur_ always be rows store we are computing.
// And rows in %processed_ row store are all computed and ready to output.
// swap $cur_ and $processed_ row store
foreach_stores([](Stores &s){ std::swap(s.cur_, s.processed_); return OB_SUCCESS; });
if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) {
first_part_saved_ = true;
foreach_stores([](Stores &s){ std::swap(s.first_, s.processed_); return OB_SUCCESS; });
}
}
}
++row_idx;
++remain.row_cnt_;
if (need_split_store && OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else if (OB_FAIL(process_child_batch(row_idx, child_brs, check_times))) {
LOG_WARN("add row to rows_store failed", K(ret));
}
} else if (!child_iter_end_) {
if (need_split_store) { // need_split_store is true means we have not found next part ever
if (OB_FAIL(get_next_batch_from_child(batch_size, child_brs))) {
LOG_WARN("get child next batch failed", K(ret));
++row_idx;
++remain.row_cnt_;
if (need_split_store && OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else { // need to deal with remain rows in this child_brs after found new part
need_loop_until_child_brs_end = true;
}
} else if (!child_iter_end_) {
if (need_split_store) { // need_split_store is true means we have not found next part ever
if (OB_FAIL(get_next_batch_from_child(batch_size, child_brs))) {
LOG_WARN("get child next batch failed", K(ret));
} else {
child_iter_end_ = child_brs->end_;
row_idx = 0;
guard.set_batch_size(child_brs->size_);
}
} else {
child_iter_end_ = child_brs->end_;
row_idx = 0;
guard.set_batch_size(child_brs->size_);
found_next_part = true;
current.row_cnt_ = wf_list_.get_last()->part_first_row_idx_;
}
} else {
} else { // child_iter_end_
found_next_part = true;
current.row_cnt_ = wf_list_.get_last()->part_first_row_idx_;
}
} else { // child_iter_end_
found_next_part = true;
// add aggr result row for the last part
if (OB_FAIL(found_part_end(wf_list_.get_header(), input_rows_.cur_))) {
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(wf_list_.get_header(), check_times))) {
LOG_WARN("compute wf values failed", K(ret));
// add aggr result row for the last part
if (OB_FAIL(found_part_end(wf_list_.get_header(), input_rows_.cur_))) {
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(wf_list_.get_header(), check_times))) {
LOG_WARN("compute wf values failed", K(ret));
}
}
}
}