fix bug: partition filter GI with NLJ can not rescan
This commit is contained in:
@ -247,7 +247,7 @@ int ObGranuleIteratorOp::try_fetch_task(ObGranuleTaskInfo &info)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObGranulePump *gi_task_pump = nullptr;
|
ObGranulePump *gi_task_pump = nullptr;
|
||||||
const ObGITaskSet *taskset = NULL;
|
const ObGITaskSet *taskset = NULL;
|
||||||
int64_t pos;
|
int64_t pos = 0;
|
||||||
//init
|
//init
|
||||||
if (is_not_init()) {
|
if (is_not_init()) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -315,11 +315,6 @@ int ObGranuleIteratorOp::rescan()
|
|||||||
int ret = ObOperator::inner_rescan();
|
int ret = ObOperator::inner_rescan();
|
||||||
CK(NULL != pump_);
|
CK(NULL != pump_);
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
} else if (!ObGranuleUtil::is_partition_task_mode(MY_SPEC.gi_attri_flag_) &&
|
|
||||||
ObGranuleUtil::partition_filter(MY_SPEC.gi_attri_flag_)) {
|
|
||||||
// no this plan
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("partition filter GI cannot rescan", K(ret));
|
|
||||||
} else if (pump_version_ != pump_->get_pump_version()) {
|
} else if (pump_version_ != pump_->get_pump_version()) {
|
||||||
// We can not reused the processed tasks when task regenerated (pump version changed).
|
// We can not reused the processed tasks when task regenerated (pump version changed).
|
||||||
// e.g.: px batch rescan.
|
// e.g.: px batch rescan.
|
||||||
@ -347,7 +342,7 @@ int ObGranuleIteratorOp::rescan()
|
|||||||
// NJ call rescan before iterator rows, need to nothing for the first scan.
|
// NJ call rescan before iterator rows, need to nothing for the first scan.
|
||||||
} else if (GI_PREPARED == state_) {
|
} else if (GI_PREPARED == state_) {
|
||||||
// At the open-stage we get a granule task, and now, we fetch all the granule task.
|
// At the open-stage we get a granule task, and now, we fetch all the granule task.
|
||||||
while(OB_SUCC(get_next_granule_task())) {}
|
while (OB_SUCC(get_next_granule_task())) {}
|
||||||
if (ret != OB_ITER_END) {
|
if (ret != OB_ITER_END) {
|
||||||
LOG_WARN("failed to get all granule task", K(ret));
|
LOG_WARN("failed to get all granule task", K(ret));
|
||||||
} else {
|
} else {
|
||||||
@ -358,10 +353,6 @@ int ObGranuleIteratorOp::rescan()
|
|||||||
ret = OB_NOT_SUPPORTED;
|
ret = OB_NOT_SUPPORTED;
|
||||||
LOG_USER_ERROR(OB_NOT_SUPPORTED, "rescan before all task in gi");
|
LOG_USER_ERROR(OB_NOT_SUPPORTED, "rescan before all task in gi");
|
||||||
LOG_WARN("rescan before all task fetched", K(ret), K(state_));
|
LOG_WARN("rescan before all task fetched", K(ret), K(state_));
|
||||||
// core dump by design:
|
|
||||||
#ifdef ENABLE_DEBUG_LOG
|
|
||||||
abort_unless(false);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
@ -420,10 +411,17 @@ int ObGranuleIteratorOp::inner_open()
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("child_op is null", K(ret));
|
LOG_WARN("child_op is null", K(ret));
|
||||||
} else if (MY_SPEC.bf_info_.is_inited_) {
|
} else if (MY_SPEC.bf_info_.is_inited_) {
|
||||||
|
// prepare_table_scan can get a gi task during open stage,
|
||||||
|
// but during open stage, partition runtime filter may not ready, we cannot sure wthether
|
||||||
|
// do pruning or not at this moment, if we not do
|
||||||
|
// so we skip the get task process, the all tasks can be pruning during open or rescan stage.
|
||||||
ObGIOpInput *input = static_cast<ObGIOpInput*>(input_);
|
ObGIOpInput *input = static_cast<ObGIOpInput*>(input_);
|
||||||
rf_key_.task_id_ = MY_SPEC.bf_info_.is_shared_? 0 : worker_id_;
|
rf_key_.task_id_ = MY_SPEC.bf_info_.is_shared_? 0 : worker_id_;
|
||||||
rf_key_.px_sequence_id_ = input->px_sequence_id_;
|
rf_key_.px_sequence_id_ = input->px_sequence_id_;
|
||||||
rf_key_.p2p_datahub_id_ = MY_SPEC.bf_info_.p2p_dh_id_;
|
rf_key_.p2p_datahub_id_ = MY_SPEC.bf_info_.p2p_dh_id_;
|
||||||
|
// when partition runtime filter pushdown to the right TSC child of NLJ,
|
||||||
|
// we must set state_ = GI_PREPARED to get all tasks during rescan
|
||||||
|
state_ = GI_PREPARED;
|
||||||
} else if (OB_FAIL(prepare_table_scan())) {
|
} else if (OB_FAIL(prepare_table_scan())) {
|
||||||
LOG_WARN("prepare table scan failed", K(ret));
|
LOG_WARN("prepare table scan failed", K(ret));
|
||||||
}
|
}
|
||||||
@ -590,12 +588,14 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning)
|
|||||||
LOG_WARN("fail to do parallel runtime filter pruning", K(ret));
|
LOG_WARN("fail to do parallel runtime filter pruning", K(ret));
|
||||||
} else {
|
} else {
|
||||||
state_ = GI_END;
|
state_ = GI_END;
|
||||||
|
all_task_fetched_ = true;
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(try_fetch_task(gi_task_info))) {
|
} else if (OB_FAIL(try_fetch_task(gi_task_info))) {
|
||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
LOG_WARN("try fetch task failed", K(ret));
|
LOG_WARN("try fetch task failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
state_ = GI_END;
|
state_ = GI_END;
|
||||||
|
all_task_fetched_ = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (OB_NOT_NULL(gi_prepare_map->get(tsc_op_id_))) {
|
if (OB_NOT_NULL(gi_prepare_map->get(tsc_op_id_))) {
|
||||||
@ -965,6 +965,7 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray<int64
|
|||||||
} else if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
|
} else if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
|
||||||
ret = OB_ITER_END;
|
ret = OB_ITER_END;
|
||||||
state_ = GI_END;
|
state_ = GI_END;
|
||||||
|
all_task_fetched_ = true;
|
||||||
} else {
|
} else {
|
||||||
if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)) {
|
if (ObGranuleUtil::enable_partition_pruning(MY_SPEC.gi_attri_flag_)) {
|
||||||
bool partition_pruned = false;
|
bool partition_pruned = false;
|
||||||
@ -977,6 +978,7 @@ int ObGranuleIteratorOp::fetch_rescan_pw_task_infos(const common::ObIArray<int64
|
|||||||
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
|
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
|
||||||
ret = OB_ITER_END;
|
ret = OB_ITER_END;
|
||||||
state_ = GI_END;
|
state_ = GI_END;
|
||||||
|
all_task_fetched_ = true;
|
||||||
} else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) {
|
} else if (OB_UNLIKELY(rescan_task_idx_ + repart_idx >= pwj_rescan_task_infos_.count())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_),
|
LOG_WARN("unexpected pwj_rescan_task_infos_ count", K(ret), K(rescan_task_idx_),
|
||||||
@ -1030,6 +1032,7 @@ int ObGranuleIteratorOp::fetch_normal_pw_task_infos(const common::ObIArray<int64
|
|||||||
LOG_WARN("try fetch task failed", K(ret));
|
LOG_WARN("try fetch task failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
state_ = GI_END;
|
state_ = GI_END;
|
||||||
|
all_task_fetched_ = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) {
|
for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) {
|
||||||
|
|||||||
Reference in New Issue
Block a user