From b716ff97bf7a85a2537a30ef96b553da207a4f76 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 21 Sep 2023 13:43:54 +0000 Subject: [PATCH] fix bug: partition filter GI with NLJ can not rescan --- src/sql/engine/px/ob_granule_iterator_op.cpp | 25 +++++++++++--------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/sql/engine/px/ob_granule_iterator_op.cpp b/src/sql/engine/px/ob_granule_iterator_op.cpp index cd02fdab3..7155023cd 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.cpp +++ b/src/sql/engine/px/ob_granule_iterator_op.cpp @@ -247,7 +247,7 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info) int ret = OB_SUCCESS; ObGranulePump *gi_task_pump = nullptr; const ObGITaskSet *taskset = NULL; - int64_t pos; + int64_t pos = 0; //init if (is_not_init()) { ret = OB_ERR_UNEXPECTED; @@ -315,11 +315,6 @@ int ObGranuleIteratorOp::rescan() int ret = ObOperator::inner_rescan(); CK(NULL != pump_); if (OB_FAIL(ret)) { - } else if (!ObGranuleUtil::is_partition_task_mode(MY_SPEC.gi_attri_flag_) && - ObGranuleUtil::partition_filter(MY_SPEC.gi_attri_flag_)) { - // no this plan - ret = OB_ERR_UNEXPECTED; - LOG_WARN("partition filter GI cannot rescan", K(ret)); } else if (pump_version_ != pump_->get_pump_version()) { // We can not reused the processed tasks when task regenerated (pump version changed). // e.g.: px batch rescan. @@ -347,7 +342,7 @@ int ObGranuleIteratorOp::rescan() // NJ call rescan before iterator rows, need to nothing for the first scan. } else if (GI_PREPARED == state_) { // At the open-stage we get a granule task, and now, we fetch all the granule task. - while(OB_SUCC(get_next_granule_task())) {} + while (OB_SUCC(get_next_granule_task())) {} if (ret != OB_ITER_END) { LOG_WARN("failed to get all granule task", K(ret)); } else { @@ -358,10 +353,6 @@ int ObGranuleIteratorOp::rescan() ret = OB_NOT_SUPPORTED; LOG_USER_ERROR(OB_NOT_SUPPORTED, "rescan before all task in gi"); LOG_WARN("rescan before all task fetched", K(ret), K(state_)); - // core dump by design: -#ifdef ENABLE_DEBUG_LOG - abort_unless(false); -#endif } } if (OB_SUCC(ret)) { @@ -420,10 +411,17 @@ int ObGranuleIteratorOp::inner_open() ret = OB_NOT_INIT; LOG_WARN("child_op is null", K(ret)); } else if (MY_SPEC.bf_info_.is_inited_) { + // prepare_table_scan can get a gi task during open stage, + // but during open stage, partition runtime filter may not ready, we cannot sure wthether + // do pruning or not at this moment, if we not do + // so we skip the get task process, the all tasks can be pruning during open or rescan stage. ObGIOpInput *input = static_cast(input_); rf_key_.task_id_ = MY_SPEC.bf_info_.is_shared_? 0 : worker_id_; rf_key_.px_sequence_id_ = input->px_sequence_id_; rf_key_.p2p_datahub_id_ = MY_SPEC.bf_info_.p2p_dh_id_; + // when partition runtime filter pushdown to the right TSC child of NLJ, + // we must set state_ = GI_PREPARED to get all tasks during rescan + state_ = GI_PREPARED; } else if (OB_FAIL(prepare_table_scan())) { LOG_WARN("prepare table scan failed", K(ret)); } @@ -590,12 +588,14 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning) LOG_WARN("fail to do parallel runtime filter pruning", K(ret)); } else { state_ = GI_END; + all_task_fetched_ = true; } } else if (OB_FAIL(try_fetch_task(gi_task_info))) { if (OB_ITER_END != ret) { LOG_WARN("try fetch task failed", K(ret)); } else { state_ = GI_END; + all_task_fetched_ = true; } } else { if (OB_NOT_NULL(gi_prepare_map->get(tsc_op_id_))) { @@ -965,6 +965,7 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray= pwj_rescan_task_infos_.count()) { ret = OB_ITER_END; state_ = GI_END; + all_task_fetched_ = true; } else { if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)) { bool partition_pruned = false; @@ -977,6 +978,7 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray= pwj_rescan_task_infos_.count()) { ret = OB_ITER_END; state_ = GI_END; + all_task_fetched_ = true; } else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_), @@ -1030,6 +1032,7 @@ int ObGranuleIteratorOp::fetch_normal_pw_task_infos(const common::ObIArray