Fix unexpected table scanned by ddl redefinition task and reject stale switch_status
This commit is contained in:
@ -90,6 +90,7 @@ int ObDDLRedefinitionSSTableBuildTask::process()
|
|||||||
ObDDLTaskInfo info;
|
ObDDLTaskInfo info;
|
||||||
bool oracle_mode = false;
|
bool oracle_mode = false;
|
||||||
bool need_exec_new_inner_sql = true;
|
bool need_exec_new_inner_sql = true;
|
||||||
|
const ObTableSchema *data_table_schema = nullptr;
|
||||||
|
|
||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -107,17 +108,22 @@ int ObDDLRedefinitionSSTableBuildTask::process()
|
|||||||
LOG_WARN("sys variable schema is NULL", K(ret));
|
LOG_WARN("sys variable schema is NULL", K(ret));
|
||||||
} else if (OB_FAIL(sys_variable_schema->get_oracle_mode(oracle_mode))) {
|
} else if (OB_FAIL(sys_variable_schema->get_oracle_mode(oracle_mode))) {
|
||||||
LOG_WARN("get oracle mode failed", K(ret));
|
LOG_WARN("get oracle mode failed", K(ret));
|
||||||
|
} else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, data_table_id_, data_table_schema))) {
|
||||||
|
LOG_WARN("get table schema failed", K(ret), K(tenant_id_), K(data_table_id_));
|
||||||
|
} else if (OB_ISNULL(data_table_schema)) {
|
||||||
|
ret = OB_TABLE_NOT_EXIST;
|
||||||
|
LOG_WARN("error unexpected, table schema must not be nullptr", K(ret), K(tenant_id_), K(data_table_id_));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_,
|
if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_,
|
||||||
data_table_id_,
|
data_table_id_,
|
||||||
dest_table_id_,
|
dest_table_id_,
|
||||||
schema_version_,
|
data_table_schema->get_schema_version(),
|
||||||
snapshot_version_,
|
snapshot_version_,
|
||||||
execution_id_,
|
execution_id_,
|
||||||
task_id_,
|
task_id_,
|
||||||
parallelism_,
|
parallelism_,
|
||||||
use_heap_table_ddl_plan_,
|
use_heap_table_ddl_plan_,
|
||||||
false/*use_schema_version_hint_for_src_table*/,
|
true/*use_schema_version_hint_for_src_table*/,
|
||||||
&col_name_map_,
|
&col_name_map_,
|
||||||
sql_string))) {
|
sql_string))) {
|
||||||
LOG_WARN("fail to generate build replica sql", K(ret));
|
LOG_WARN("fail to generate build replica sql", K(ret));
|
||||||
|
|||||||
@ -692,8 +692,13 @@ int ObDDLRetryTask::update_task_status_wait_child_task_finish(
|
|||||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id));
|
||||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id, task_id, curr_task_status, execution_id))) {
|
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id, task_id, curr_task_status, execution_id))) {
|
||||||
LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id));
|
LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id));
|
||||||
|
} else if (OB_UNLIKELY(ObDDLTaskStatus::DROP_SCHEMA != curr_task_status)) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
LOG_WARN("task status updated", K(ret), K(task_id), K(curr_task_status));
|
||||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status(trans, tenant_id, task_id, new_task_status))) {
|
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status(trans, tenant_id, task_id, new_task_status))) {
|
||||||
LOG_WARN("update task status failed", K(ret));
|
LOG_WARN("update task status failed", K(ret));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("update task status to wait child task finish", K(ret));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -968,16 +968,17 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
|
|||||||
int64_t table_task_status = 0;
|
int64_t table_task_status = 0;
|
||||||
int64_t execution_id = -1;
|
int64_t execution_id = -1;
|
||||||
if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, execution_id))) {
|
if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, execution_id))) {
|
||||||
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||||
|
need_retry_ = false;
|
||||||
|
}
|
||||||
LOG_WARN("select for update failed", K(ret), K(task_id_));
|
LOG_WARN("select for update failed", K(ret), K(task_id_));
|
||||||
} else if (old_status != task_status_) {
|
|
||||||
ret = OB_EAGAIN;
|
|
||||||
LOG_WARN("task status has changed", K(ret));
|
|
||||||
} else if (table_task_status == FAIL && old_status != table_task_status) {
|
} else if (table_task_status == FAIL && old_status != table_task_status) {
|
||||||
// task failed marked by user
|
// task failed marked by user
|
||||||
real_new_status = FAIL;
|
real_new_status = FAIL;
|
||||||
ret_code_ = OB_CANCELED;
|
ret_code_ = OB_CANCELED;
|
||||||
} else if (table_task_status == SUCCESS && old_status != table_task_status) {
|
} else if (old_status != table_task_status) {
|
||||||
real_new_status = SUCCESS;
|
// refresh status
|
||||||
|
real_new_status = static_cast<ObDDLTaskStatus>(table_task_status);
|
||||||
} else if (old_status == real_new_status) {
|
} else if (old_status == real_new_status) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user