fix gi fetch task concurrently bug

This commit is contained in:
sdc 2023-11-23 07:11:28 +00:00 committed by ob-robot
parent a69e3a4148
commit e0ed12fb6e
2 changed files with 10 additions and 2 deletions

View File

@ -457,7 +457,9 @@ int ObGranulePump::fetch_pw_granule_from_shared_pool(ObIArray<ObGranuleTaskInfo>
// 表示取不到下一个GI task的op的个数;
// 理论上end_op_count只能等于0(表示gi任务还没有被消费完)或者等于`op_ids.count()`(表示gi任务全部被消费完)
int64_t end_op_count = 0;
if (no_more_task_from_shared_pool_) {
if (OB_FAIL(fetch_task_ret_)) {
LOG_WARN("fetch task concurrently already failed", K(ret));
} else if (no_more_task_from_shared_pool_) {
ret = OB_ITER_END;
} else if (GIT_FULL_PARTITION_WISE != splitter_type_) {
ret = OB_ERR_UNEXPECTED;
@ -489,6 +491,7 @@ int ObGranulePump::fetch_pw_granule_from_shared_pool(ObIArray<ObGranuleTaskInfo>
// 防御性代码:检查full partition wise的情况下,每一个op对应的GI task是否被同时消费完毕
if (OB_FAIL(ret)) {
fetch_task_ret_ = ret;
} else if (OB_FAIL(check_pw_end(end_op_count, op_ids.count(), infos.count()))) {
if (OB_ITER_END != ret) {
LOG_WARN("incorrect state", K(ret));
@ -1617,6 +1620,7 @@ int ObGranulePump::reset_gi_task()
} else {
is_taskset_reset_ = true;
no_more_task_from_shared_pool_ = false;
fetch_task_ret_ = OB_SUCCESS;
for (int64_t i = 0; i < gi_task_array_map_.count() && OB_SUCC(ret); ++i) {
GITaskArrayItem &item = gi_task_array_map_.at(i);
for(int64_t j = 0; j < item.taskset_array_.count() && OB_SUCC(ret); ++j) {

View File

@ -481,7 +481,8 @@ public:
need_partition_pruning_(false),
pruning_table_locations_(),
pump_version_(0),
is_taskset_reset_(false)
is_taskset_reset_(false),
fetch_task_ret_(OB_SUCCESS)
{
}
@ -593,6 +594,9 @@ private:
int64_t pump_version_;
bool is_taskset_reset_;
// when granule tasks are fetched concurrently, if one thread failed to fetch task,
// others should not fetch tasks any more.
int fetch_task_ret_;
};
}//sql