diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index 81ed89840e..cda0d5d7fa 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -394,24 +394,10 @@ int ObPxReceiveOp::inner_rescan() int ObPxReceiveOp::inner_drain_exch() { int ret = OB_SUCCESS; - uint64_t version = -1; if (iter_end_) { exch_drained_ = true; } else if (!exch_drained_) { - if (IS_PX_COORD(get_spec().get_type())) { - /** - * 为什么qc的drain什么操作都不需要做? - * qc调用drain_exch有两种情况,第一种是它自己inner get next row - * 网上返回了ob iter end,这种不会走到这里。 - * 第二种是类似这种计划 - * merge join - * QC1 QC2 - * QC1它如果end了,主线程会去调用QC2的drain exch。这种情况下我们 - * 根本不需要做任何处理,因为上层已经获得所有行,很快就会调用inner close - * 来结束所有的dfo,没必要这里先发一次终止消息出去。 - */ - LOG_TRACE("drain QC"); - } else if (OB_FAIL(try_link_channel())) { + if (OB_FAIL(try_link_channel())) { LOG_WARN("failed to link channel", K(ret)); } else if (OB_FAIL(active_all_receive_channel())) { LOG_WARN("failed to active all receive channel", K(ret)); diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 663bb37973..293d8a2c0c 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -607,6 +607,37 @@ int ObPxCoordOp::inner_close() return ret; } +int ObPxCoordOp::inner_drain_exch() +{ + int ret = OB_SUCCESS; + LOG_TRACE("drain QC", K(get_spec().id_), K(iter_end_), K(exch_drained_), + K(enable_px_batch_rescan()), K(lbt())); + /** + * why different from receive operator? + * 1. Why not need try link channel. + * There are two situations when qc call drain_exch. + * The first is qc return iter_end when inner_get_next_row, in this case, it will not reach here. + * The second is plan like this: + * merge join + * QC1 QC2 + * If QC1 ends, the main thread will call drain exch of QC2. + * In this situation, no action is required because the upper operator has already got enough rows + * and will call inner_close to terminate all dfos soon. + * Therefore, there is no need to send a termination message + * 2. Why not drain channels if enable px batch rescan? + * If use px batch rescan, drain channel may lead to missing results of other params in the batch. + */ + if (enable_px_batch_rescan()) { + // do nothing + } else if (iter_end_) { + exch_drained_ = true; + } else if (!exch_drained_) { + dfc_.drain_all_channels(); + exch_drained_ = true; + } + return ret; +} + int ObPxCoordOp::destroy_all_channel() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/px/ob_px_coord_op.h b/src/sql/engine/px/ob_px_coord_op.h index 10d65464af..45a3136140 100644 --- a/src/sql/engine/px/ob_px_coord_op.h +++ b/src/sql/engine/px/ob_px_coord_op.h @@ -61,6 +61,7 @@ public: allocator_.reset(); ObPxReceiveOp::destroy(); } + virtual int inner_drain_exch() override; void reset_for_rescan() { coord_info_.reset_for_rescan();