diff --git a/src/sql/engine/ob_operator.cpp b/src/sql/engine/ob_operator.cpp index 917a311cf..e5b7afb0e 100644 --- a/src/sql/engine/ob_operator.cpp +++ b/src/sql/engine/ob_operator.cpp @@ -1207,10 +1207,6 @@ int ObOperator::get_next_row() } } else if (OB_ITER_END == ret) { row_reach_end_ = true; - int tmp_ret = do_drain_exch(); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("drain exchange data failed", K(tmp_ret)); - } if (got_first_row_) { op_monitor_info_.last_row_time_ = oceanbase::common::ObClockGenerator::getClock(); } @@ -1231,6 +1227,12 @@ int ObOperator::get_next_row() } } } + if (OB_FAIL(ret)) { + int tmp_ret = do_drain_exch(); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("drain exchange data failed", K(tmp_ret)); + } + } end_ash_line_id_reg(); end_cpu_time_counting(); return ret; @@ -1239,6 +1241,7 @@ int ObOperator::get_next_row() int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&batch_rows) { int ret = OB_SUCCESS; + bool need_drain = false; begin_cpu_time_counting(); begin_ash_line_id_reg(); @@ -1366,10 +1369,7 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba got_first_row_ = true; } if (brs_.end_) { - int tmp_ret = do_drain_exch(); - if (OB_SUCCESS != tmp_ret) { - LOG_WARN("drain exchange data failed", K(tmp_ret)); - } + need_drain = true; op_monitor_info_.last_row_time_ = ObClockGenerator::getClock(); } } @@ -1395,7 +1395,12 @@ int ObOperator::get_next_batch(const int64_t max_row_cnt, const ObBatchRows *&ba if (!spec_.use_rich_format_ && PHY_TABLE_SCAN != spec_.type_) { brs_.set_all_rows_active(false); } - + if (need_drain || OB_FAIL(ret)) { + int tmp_ret = do_drain_exch(); + if (OB_SUCCESS != tmp_ret) { + LOG_WARN("drain exchange data failed", K(tmp_ret)); + } + } end_ash_line_id_reg(); end_cpu_time_counting(); return ret;