diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 0199956e5..6fd8e6ccc 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -3110,7 +3110,7 @@ int ObService::check_and_cancel_ddl_complement_data_dag(const ObDDLBuildSingleRe LOG_WARN("fail to init complement data dag", K(ret), K(arg)); } else if (OB_FAIL(dag_scheduler->check_dag_exist(dag, is_dag_exist))) { LOG_WARN("check dag exist failed", K(ret)); - } else if (is_dag_exist && OB_FAIL(dag_scheduler->cancel_dag(dag))) { + } else if (is_dag_exist && OB_FAIL(dag_scheduler->cancel_dag(dag, true/*force_cancel, to cancel running dag by yield.*/))) { // sync to cancel ready dag only, not including running dag. LOG_WARN("cancel dag failed", KP(dag), K(ret)); } diff --git a/src/storage/ddl/ob_complement_data_task.cpp b/src/storage/ddl/ob_complement_data_task.cpp index f942aecb3..a85f14b96 100644 --- a/src/storage/ddl/ob_complement_data_task.cpp +++ b/src/storage/ddl/ob_complement_data_task.cpp @@ -717,42 +717,6 @@ int ObComplementDataDag::fill_dag_key(char *buf, const int64_t buf_len) const return ret; } -int ObComplementDataDag::check_and_exit_on_demand() -{ - int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_inited_)) { - ret = OB_NOT_INIT; - LOG_WARN("dag has not been initialized", K(ret)); - } else { - DEBUG_SYNC(HOLD_DDL_COMPLEMENT_DAG_WHEN_APPEND_ROW); - SMART_VAR(ObMySQLProxy::MySQLResult, res) { - ObSqlString sql_string; - sqlclient::ObMySQLResult *result = nullptr; - if (OB_TMP_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, param_.task_id_))) { - LOG_WARN("assign sql string failed", K(tmp_ret), K(param_)); - } else if (OB_TMP_FAIL(GCTX.sql_proxy_->read(res, param_.dest_tenant_id_, sql_string.ptr()))) { - LOG_WARN("fail to execute sql", K(tmp_ret), K(sql_string)); - } else if (OB_ISNULL(result = res.get_result())) { - tmp_ret = OB_ERR_UNEXPECTED; - LOG_WARN("error unexpected, query result must not be NULL", K(tmp_ret)); - } else if (OB_TMP_FAIL(result->next())) { - if (OB_ENTRY_NOT_EXIST == tmp_ret) { - ret = OB_CANCELED; - } - LOG_WARN("iterate next failed", K(ret), K(tmp_ret)); - } else { - int task_status = 0; - EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int); - if (OB_SUCC(ret)) { - ret = task_status == ObDDLTaskStatus::REDEFINITION ? ret : OB_CANCELED; - } - } - } - } - return ret; -} - ObComplementPrepareTask::ObComplementPrepareTask() : ObITask(TASK_TYPE_COMPLEMENT_PREPARE), is_inited_(false), param_(nullptr), context_(nullptr) { @@ -901,7 +865,7 @@ int ObComplementWriteTask::generate_next_task(ObITask *&next_task) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObComplementWriteTask has not been inited", K(ret)); - } else if (next_task_id == param_->concurrent_cnt_) { + } else if (next_task_id >= param_->concurrent_cnt_) { ret = OB_ITER_END; } else if (OB_ISNULL(tmp_dag)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/ddl/ob_complement_data_task.h b/src/storage/ddl/ob_complement_data_task.h index 473e06d40..95c383966 100644 --- a/src/storage/ddl/ob_complement_data_task.h +++ b/src/storage/ddl/ob_complement_data_task.h @@ -203,7 +203,6 @@ public: // report replica build status to RS. int report_replica_build_status(); int calc_total_row_count(); - int check_and_exit_on_demand(); private: bool is_inited_; ObComplementDataParam param_; diff --git a/src/storage/ddl/ob_direct_load_struct.cpp b/src/storage/ddl/ob_direct_load_struct.cpp index 9b9a20bbf..1b24e6c5c 100755 --- a/src/storage/ddl/ob_direct_load_struct.cpp +++ b/src/storage/ddl/ob_direct_load_struct.cpp @@ -1424,7 +1424,9 @@ int ObDirectLoadSliceWriter::fill_sstable_slice( while (OB_SUCC(ret)) { arena.reuse(); const blocksstable::ObDatumRow *cur_row = nullptr; - if (OB_FAIL(THIS_WORKER.check_status())) { + if (OB_FAIL(share::dag_yield())) { + LOG_WARN("dag yield failed", K(ret), K(affected_rows)); // exit for dag task as soon as possible after canceled. + } else if (OB_FAIL(THIS_WORKER.check_status())) { LOG_WARN("check status failed", K(ret)); } else if (ATOMIC_LOAD(&is_canceled_)) { ret = OB_CANCELED; diff --git a/src/storage/ddl/ob_tablet_lob_split_task.cpp b/src/storage/ddl/ob_tablet_lob_split_task.cpp index ae433f9b1..37f7b157c 100644 --- a/src/storage/ddl/ob_tablet_lob_split_task.cpp +++ b/src/storage/ddl/ob_tablet_lob_split_task.cpp @@ -690,7 +690,7 @@ int ObTabletLobBuildMapTask::generate_next_task(ObITask *&next_task) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObTabletLobBuildMapTask has not been inited", K(ret)); - } else if (next_task_id == param_->parallelism_) { + } else if (next_task_id >= param_->parallelism_) { ret = OB_ITER_END; } else if (OB_ISNULL(tmp_dag)) { ret = OB_ERR_UNEXPECTED; @@ -1092,7 +1092,7 @@ int ObTabletLobWriteDataTask::generate_next_task(ObITask *&next_task) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObTabletLobWriteDataTask has not been inited", K(ret)); - } else if (true || next_task_id == param_->parallelism_) { // not allow para now + } else if (true || next_task_id >= param_->parallelism_) { // not allow para now ret = OB_ITER_END; } else if (OB_ISNULL(tmp_dag)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/ddl/ob_tablet_split_task.cpp b/src/storage/ddl/ob_tablet_split_task.cpp index f54ea64d2..0bff83a38 100644 --- a/src/storage/ddl/ob_tablet_split_task.cpp +++ b/src/storage/ddl/ob_tablet_split_task.cpp @@ -740,7 +740,7 @@ int ObTabletSplitWriteTask::generate_next_task(ObITask *&next_task) } else if (FALSE_IT(dag = static_cast (tmp_dag))) { } else if (param_->can_reuse_macro_block_) { ret = OB_ITER_END; - } else if (next_task_id == context_->data_split_ranges_.count()) { + } else if (next_task_id >= context_->data_split_ranges_.count()) { ret = OB_ITER_END; } else if (OB_FAIL(dag->alloc_task(next_write_task))) { LOG_WARN("alloc task failed", K(ret));