exit dag task after canceled.
This commit is contained in:
parent
a6fd265a51
commit
b1af116837
@ -3110,7 +3110,7 @@ int ObService::check_and_cancel_ddl_complement_data_dag(const ObDDLBuildSingleRe
|
||||
LOG_WARN("fail to init complement data dag", K(ret), K(arg));
|
||||
} else if (OB_FAIL(dag_scheduler->check_dag_exist(dag, is_dag_exist))) {
|
||||
LOG_WARN("check dag exist failed", K(ret));
|
||||
} else if (is_dag_exist && OB_FAIL(dag_scheduler->cancel_dag(dag))) {
|
||||
} else if (is_dag_exist && OB_FAIL(dag_scheduler->cancel_dag(dag, true/*force_cancel, to cancel running dag by yield.*/))) {
|
||||
// sync to cancel ready dag only, not including running dag.
|
||||
LOG_WARN("cancel dag failed", KP(dag), K(ret));
|
||||
}
|
||||
|
@ -717,42 +717,6 @@ int ObComplementDataDag::fill_dag_key(char *buf, const int64_t buf_len) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObComplementDataDag::check_and_exit_on_demand()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("dag has not been initialized", K(ret));
|
||||
} else {
|
||||
DEBUG_SYNC(HOLD_DDL_COMPLEMENT_DAG_WHEN_APPEND_ROW);
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObSqlString sql_string;
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
if (OB_TMP_FAIL(sql_string.assign_fmt("SELECT status FROM %s WHERE task_id = %lu", share::OB_ALL_DDL_TASK_STATUS_TNAME, param_.task_id_))) {
|
||||
LOG_WARN("assign sql string failed", K(tmp_ret), K(param_));
|
||||
} else if (OB_TMP_FAIL(GCTX.sql_proxy_->read(res, param_.dest_tenant_id_, sql_string.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", K(tmp_ret), K(sql_string));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
tmp_ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("error unexpected, query result must not be NULL", K(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(result->next())) {
|
||||
if (OB_ENTRY_NOT_EXIST == tmp_ret) {
|
||||
ret = OB_CANCELED;
|
||||
}
|
||||
LOG_WARN("iterate next failed", K(ret), K(tmp_ret));
|
||||
} else {
|
||||
int task_status = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "status", task_status, int);
|
||||
if (OB_SUCC(ret)) {
|
||||
ret = task_status == ObDDLTaskStatus::REDEFINITION ? ret : OB_CANCELED;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObComplementPrepareTask::ObComplementPrepareTask()
|
||||
: ObITask(TASK_TYPE_COMPLEMENT_PREPARE), is_inited_(false), param_(nullptr), context_(nullptr)
|
||||
{
|
||||
@ -901,7 +865,7 @@ int ObComplementWriteTask::generate_next_task(ObITask *&next_task)
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObComplementWriteTask has not been inited", K(ret));
|
||||
} else if (next_task_id == param_->concurrent_cnt_) {
|
||||
} else if (next_task_id >= param_->concurrent_cnt_) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_ISNULL(tmp_dag)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -203,7 +203,6 @@ public:
|
||||
// report replica build status to RS.
|
||||
int report_replica_build_status();
|
||||
int calc_total_row_count();
|
||||
int check_and_exit_on_demand();
|
||||
private:
|
||||
bool is_inited_;
|
||||
ObComplementDataParam param_;
|
||||
|
@ -1424,7 +1424,9 @@ int ObDirectLoadSliceWriter::fill_sstable_slice(
|
||||
while (OB_SUCC(ret)) {
|
||||
arena.reuse();
|
||||
const blocksstable::ObDatumRow *cur_row = nullptr;
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
if (OB_FAIL(share::dag_yield())) {
|
||||
LOG_WARN("dag yield failed", K(ret), K(affected_rows)); // exit for dag task as soon as possible after canceled.
|
||||
} else if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("check status failed", K(ret));
|
||||
} else if (ATOMIC_LOAD(&is_canceled_)) {
|
||||
ret = OB_CANCELED;
|
||||
|
@ -690,7 +690,7 @@ int ObTabletLobBuildMapTask::generate_next_task(ObITask *&next_task)
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletLobBuildMapTask has not been inited", K(ret));
|
||||
} else if (next_task_id == param_->parallelism_) {
|
||||
} else if (next_task_id >= param_->parallelism_) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_ISNULL(tmp_dag)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -1092,7 +1092,7 @@ int ObTabletLobWriteDataTask::generate_next_task(ObITask *&next_task)
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTabletLobWriteDataTask has not been inited", K(ret));
|
||||
} else if (true || next_task_id == param_->parallelism_) { // not allow para now
|
||||
} else if (true || next_task_id >= param_->parallelism_) { // not allow para now
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_ISNULL(tmp_dag)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
|
@ -740,7 +740,7 @@ int ObTabletSplitWriteTask::generate_next_task(ObITask *&next_task)
|
||||
} else if (FALSE_IT(dag = static_cast<ObTabletSplitDag *> (tmp_dag))) {
|
||||
} else if (param_->can_reuse_macro_block_) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (next_task_id == context_->data_split_ranges_.count()) {
|
||||
} else if (next_task_id >= context_->data_split_ranges_.count()) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(dag->alloc_task(next_write_task))) {
|
||||
LOG_WARN("alloc task failed", K(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user