fix drain + px batch rescan lead to missing results bug

This commit is contained in:
sdc
2023-07-18 02:12:14 +00:00
committed by ob-robot
parent 63ec91d4c4
commit f8781fb586
3 changed files with 33 additions and 15 deletions

View File

@ -394,24 +394,10 @@ int ObPxReceiveOp::inner_rescan()
int ObPxReceiveOp::inner_drain_exch() int ObPxReceiveOp::inner_drain_exch()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
uint64_t version = -1;
if (iter_end_) { if (iter_end_) {
exch_drained_ = true; exch_drained_ = true;
} else if (!exch_drained_) { } else if (!exch_drained_) {
if (IS_PX_COORD(get_spec().get_type())) { if (OB_FAIL(try_link_channel())) {
/**
* 为什么qc的drain什么操作都不需要做?
* qc调用drain_exch有两种情况,第一种是它自己inner get next row
* 网上返回了ob iter end,这种不会走到这里。
* 第二种是类似这种计划
* merge join
* QC1 QC2
* QC1它如果end了,主线程会去调用QC2的drain exch。这种情况下我们
* 根本不需要做任何处理,因为上层已经获得所有行,很快就会调用inner close
* 来结束所有的dfo,没必要这里先发一次终止消息出去。
*/
LOG_TRACE("drain QC");
} else if (OB_FAIL(try_link_channel())) {
LOG_WARN("failed to link channel", K(ret)); LOG_WARN("failed to link channel", K(ret));
} else if (OB_FAIL(active_all_receive_channel())) { } else if (OB_FAIL(active_all_receive_channel())) {
LOG_WARN("failed to active all receive channel", K(ret)); LOG_WARN("failed to active all receive channel", K(ret));

View File

@ -607,6 +607,37 @@ int ObPxCoordOp::inner_close()
return ret; return ret;
} }
int ObPxCoordOp::inner_drain_exch()
{
int ret = OB_SUCCESS;
LOG_TRACE("drain QC", K(get_spec().id_), K(iter_end_), K(exch_drained_),
K(enable_px_batch_rescan()), K(lbt()));
/**
* why different from receive operator?
* 1. Why not need try link channel.
* There are two situations when qc call drain_exch.
* The first is qc return iter_end when inner_get_next_row, in this case, it will not reach here.
* The second is plan like this:
* merge join
* QC1 QC2
* If QC1 ends, the main thread will call drain exch of QC2.
* In this situation, no action is required because the upper operator has already got enough rows
* and will call inner_close to terminate all dfos soon.
* Therefore, there is no need to send a termination message
* 2. Why not drain channels if enable px batch rescan?
* If use px batch rescan, drain channel may lead to missing results of other params in the batch.
*/
if (enable_px_batch_rescan()) {
// do nothing
} else if (iter_end_) {
exch_drained_ = true;
} else if (!exch_drained_) {
dfc_.drain_all_channels();
exch_drained_ = true;
}
return ret;
}
int ObPxCoordOp::destroy_all_channel() int ObPxCoordOp::destroy_all_channel()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -61,6 +61,7 @@ public:
allocator_.reset(); allocator_.reset();
ObPxReceiveOp::destroy(); ObPxReceiveOp::destroy();
} }
virtual int inner_drain_exch() override;
void reset_for_rescan() void reset_for_rescan()
{ {
coord_info_.reset_for_rescan(); coord_info_.reset_for_rescan();