diff --git a/src/sql/engine/set/ob_hash_union_vec_op.cpp b/src/sql/engine/set/ob_hash_union_vec_op.cpp index e8e79a3f6..b7ee9705d 100644 --- a/src/sql/engine/set/ob_hash_union_vec_op.cpp +++ b/src/sql/engine/set/ob_hash_union_vec_op.cpp @@ -106,79 +106,81 @@ int ObHashUnionVecOp::inner_get_next_batch(const int64_t max_row_cnt) ObBitVector *output_vec = nullptr; const ObBatchRows *child_brs = nullptr; - if (!has_got_part_) { - if (child_op_end) { - end_to_process = true; - } else if (OB_FAIL(get_child_next_batch(batch_size, child_brs))) { - LOG_WARN("failed to get child next batch", K(ret)); - } else if (OB_FAIL(convert_vector(cur_child_op_->get_spec().output_, - MY_SPEC.set_exprs_, - child_brs))) { - LOG_WARN("copy current row failed", K(ret)); - } else if (OB_FAIL(hp_infras_.calc_hash_value_for_batch(MY_SPEC.set_exprs_, - *child_brs, - hash_values_for_batch_))) { - LOG_WARN("failed to calc hash value for batch", K(ret)); - } else { - child_op_end = cur_child_op_ == right_ && child_brs->end_ && 0 != child_brs->size_; - end_to_process = cur_child_op_ == right_ && child_brs->end_ && 0 == child_brs->size_; - read_rows = child_brs->size_; - } - } else if (OB_FAIL(hp_infras_.get_left_next_batch(MY_SPEC.set_exprs_, - batch_size, - read_rows, - hash_values_for_batch_))) { - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - end_to_process = true; - } else { - LOG_WARN("failed to get batch from infra", K(ret)); - } - } - if (OB_SUCC(ret) && end_to_process) { - end_to_process = false; - if (OB_FAIL(hp_infras_.finish_insert_row())) { - LOG_WARN("failed to finish insert row", K(ret)); - } else if (!has_got_part_) { - has_got_part_ = true; - } else if (OB_FAIL(hp_infras_.close_cur_part(InputSide::LEFT))) { - LOG_WARN("failed to close cur part", K(ret)); - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(hp_infras_.end_round())) { - LOG_WARN("failed to end round", K(ret)); - } else if (OB_FAIL(try_check_status())) { - LOG_WARN("failed to check status", K(ret)); - } else if (OB_FAIL(hp_infras_.start_round())) { - LOG_WARN("failed to start round", K(ret)); - } else if (OB_FAIL(hp_infras_.get_next_partition(InputSide::LEFT))) { - if (OB_ITER_END != ret) { - LOG_WARN("failed to get next dumped partition", K(ret)); + if (OB_SUCC(ret)) { + if (!has_got_part_) { + if (child_op_end) { + end_to_process = true; + } else if (OB_FAIL(get_child_next_batch(batch_size, child_brs))) { + LOG_WARN("failed to get child next batch", K(ret)); + } else if (OB_FAIL(convert_vector(cur_child_op_->get_spec().output_, + MY_SPEC.set_exprs_, + child_brs))) { + LOG_WARN("copy current row failed", K(ret)); + } else if (OB_FAIL(hp_infras_.calc_hash_value_for_batch(MY_SPEC.set_exprs_, + *child_brs, + hash_values_for_batch_))) { + LOG_WARN("failed to calc hash value for batch", K(ret)); + } else { + child_op_end = cur_child_op_ == right_ && child_brs->end_ && 0 != child_brs->size_; + end_to_process = cur_child_op_ == right_ && child_brs->end_ && 0 == child_brs->size_; + read_rows = child_brs->size_; + } + } else if (OB_FAIL(hp_infras_.get_left_next_batch(MY_SPEC.set_exprs_, + batch_size, + read_rows, + hash_values_for_batch_))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + end_to_process = true; + } else { + LOG_WARN("failed to get batch from infra", K(ret)); } - } else if (OB_FAIL(hp_infras_.open_cur_part(InputSide::LEFT))) { - LOG_WARN("failed to open cur part", K(ret)); - } else if (OB_FAIL(hp_infras_.resize(hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) { - LOG_WARN("failed to resize cur part", K(ret)); } - } else if (OB_FAIL(ret)) { - } else if (has_got_part_ && OB_FAIL(hp_infras_.insert_row_for_batch(MY_SPEC.set_exprs_, - hash_values_for_batch_, - read_rows, - nullptr, - output_vec))) { - LOG_WARN("failed to insert batch for dump", K(ret)); - } else if (!has_got_part_ && OB_FAIL(hp_infras_.insert_row_for_batch(MY_SPEC.set_exprs_, + if (OB_SUCC(ret) && end_to_process) { + end_to_process = false; + if (OB_FAIL(hp_infras_.finish_insert_row())) { + LOG_WARN("failed to finish insert row", K(ret)); + } else if (!has_got_part_) { + has_got_part_ = true; + } else if (OB_FAIL(hp_infras_.close_cur_part(InputSide::LEFT))) { + LOG_WARN("failed to close cur part", K(ret)); + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(hp_infras_.end_round())) { + LOG_WARN("failed to end round", K(ret)); + } else if (OB_FAIL(try_check_status())) { + LOG_WARN("failed to check status", K(ret)); + } else if (OB_FAIL(hp_infras_.start_round())) { + LOG_WARN("failed to start round", K(ret)); + } else if (OB_FAIL(hp_infras_.get_next_partition(InputSide::LEFT))) { + if (OB_ITER_END != ret) { + LOG_WARN("failed to get next dumped partition", K(ret)); + } + } else if (OB_FAIL(hp_infras_.open_cur_part(InputSide::LEFT))) { + LOG_WARN("failed to open cur part", K(ret)); + } else if (OB_FAIL(hp_infras_.resize(hp_infras_.get_cur_part_row_cnt(InputSide::LEFT)))) { + LOG_WARN("failed to resize cur part", K(ret)); + } + } else if (OB_FAIL(ret)) { + } else if (has_got_part_ && OB_FAIL(hp_infras_.insert_row_for_batch(MY_SPEC.set_exprs_, hash_values_for_batch_, read_rows, - child_brs->skip_, + nullptr, output_vec))) { - LOG_WARN("failed to insert batch for no dump", K(ret)); - } else if (OB_ISNULL(output_vec)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get output vec", K(ret)); - } else { - brs_.size_ = read_rows; - brs_.skip_->deep_copy(*output_vec, read_rows); + LOG_WARN("failed to insert batch for dump", K(ret)); + } else if (!has_got_part_ && OB_FAIL(hp_infras_.insert_row_for_batch(MY_SPEC.set_exprs_, + hash_values_for_batch_, + read_rows, + child_brs->skip_, + output_vec))) { + LOG_WARN("failed to insert batch for no dump", K(ret)); + } else if (OB_ISNULL(output_vec)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get output vec", K(ret)); + } else { + brs_.size_ = read_rows; + brs_.skip_->deep_copy(*output_vec, read_rows); + } }