diff --git a/src/sql/engine/ob_operator.cpp b/src/sql/engine/ob_operator.cpp index e5b7afb0e9..917a311cf0 100644 --- a/src/sql/engine/ob_operator.cpp +++ b/src/sql/engine/ob_operator.cpp @@ -1207,6 +1207,10 @@ int ObOperator::get_next_row() } } else if (OB_ITER_END == ret) { row_reach_end_ = true; + int tmp_ret = do_drain_exch(); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("drain exchange data failed", K(tmp_ret)); + } if (got_first_row_) { op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock(); } @@ -1227,12 +1231,6 @@ int ObOperator::get_next_row() } } } - if (OB_FAIL(ret)) { - int tmp_ret = do_drain_exch(); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("drain exchange data failed", K(tmp_ret)); - } - } end_ash_line_id_reg(); end_cpu_time_counting(); return ret; @@ -1241,7 +1239,6 @@ int ObOperator::get_next_row() int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows) { int ret = OB_SUCCESS; - bool need_drain = false; begin_cpu_time_counting(); begin_ash_line_id_reg(); @@ -1369,7 +1366,10 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba got_first_row_ = true; } if (brs_.end_) { - need_drain = true; + int tmp_ret = do_drain_exch(); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("drain exchange data failed", K(tmp_ret)); + } op_monitor_info_.last_row_time_ = ObClockGenerator::getClock(); } } @@ -1395,12 +1395,7 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba if (!spec_.use_rich_format_ && PHY_TABLE_SCAN != spec_.type_) { brs_.set_all_rows_active(false); } - if (need_drain || OB_FAIL(ret)) { - int tmp_ret = do_drain_exch(); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("drain exchange data failed", K(tmp_ret)); - } - } + end_ash_line_id_reg(); end_cpu_time_counting(); return ret;