diff --git a/src/sql/engine/px/ob_granule_pump.cpp b/src/sql/engine/px/ob_granule_pump.cpp index e1f16b5211..d0df385452 100644 --- a/src/sql/engine/px/ob_granule_pump.cpp +++ b/src/sql/engine/px/ob_granule_pump.cpp @@ -457,7 +457,9 @@ int ObGranulePump::fetch_pw_granule_from_shared_pool(ObIArray // 表示取不到下一个GI task的op的个数; // 理论上end_op_count只能等于0(表示gi任务还没有被消费完)或者等于`op_ids.count()`(表示gi任务全部被消费完) int64_t end_op_count = 0; - if (no_more_task_from_shared_pool_) { + if (OB_FAIL(fetch_task_ret_)) { + LOG_WARN("fetch task concurrently already failed", K(ret)); + } else if (no_more_task_from_shared_pool_) { ret = OB_ITER_END; } else if (GIT_FULL_PARTITION_WISE != splitter_type_) { ret = OB_ERR_UNEXPECTED; @@ -489,6 +491,7 @@ int ObGranulePump::fetch_pw_granule_from_shared_pool(ObIArray // 防御性代码:检查full partition wise的情况下,每一个op对应的GI task是否被同时消费完毕 if (OB_FAIL(ret)) { + fetch_task_ret_ = ret; } else if (OB_FAIL(check_pw_end(end_op_count, op_ids.count(), infos.count()))) { if (OB_ITER_END != ret) { LOG_WARN("incorrect state", K(ret)); @@ -1617,6 +1620,7 @@ int ObGranulePump::reset_gi_task() } else { is_taskset_reset_ = true; no_more_task_from_shared_pool_ = false; + fetch_task_ret_ = OB_SUCCESS; for (int64_t i = 0; i < gi_task_array_map_.count() && OB_SUCC(ret); ++i) { GITaskArrayItem &item = gi_task_array_map_.at(i); for(int64_t j = 0; j < item.taskset_array_.count() && OB_SUCC(ret); ++j) { diff --git a/src/sql/engine/px/ob_granule_pump.h b/src/sql/engine/px/ob_granule_pump.h index 63ee06dd03..c964e59e64 100644 --- a/src/sql/engine/px/ob_granule_pump.h +++ b/src/sql/engine/px/ob_granule_pump.h @@ -481,7 +481,8 @@ public: need_partition_pruning_(false), pruning_table_locations_(), pump_version_(0), - is_taskset_reset_(false) + is_taskset_reset_(false), + fetch_task_ret_(OB_SUCCESS) { } @@ -593,6 +594,9 @@ private: int64_t pump_version_; bool is_taskset_reset_; + // when granule tasks are fetched concurrently, if one thread failed to fetch task, + // others should not fetch tasks any more. + int fetch_task_ret_; }; }//sql