diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index 9e037d0a7..e5d59c0a0 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -484,12 +484,10 @@ int ObColumnRedefinitionTask::serialize_params_to_message(char *buf, const int64 if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) { - LOG_WARN("serialize table arg failed", K(ret)); - } else { - LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, data_format_version_); + LOG_WARN("alter_table_arg_ serialize failed", K(ret)); } return ret; } @@ -500,27 +498,22 @@ int ObColumnRedefinitionTask::deserlize_params_from_message(const uint64_t tenan obrpc::ObAlterTableArg tmp_arg; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + LOG_WARN("invalid arguments", K(ret), KP(buf), K(tenant_id), K(data_len)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); - } else { - LST_DO_CODE(OB_UNIS_DECODE, parallelism_, data_format_version_); } return ret; } int64_t ObColumnRedefinitionTask::get_serialize_param_size() const { - return alter_table_arg_.get_serialize_size() - + serialization::encoded_length_i64(task_version_) - + serialization::encoded_length_i64(parallelism_) - + serialization::encoded_length_i64(data_format_version_); + return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size(); } int ObColumnRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus next_task_status) diff --git a/src/rootserver/ddl_task/ob_constraint_task.cpp b/src/rootserver/ddl_task/ob_constraint_task.cpp index b6dde55e5..ea2fd9ed2 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.cpp +++ b/src/rootserver/ddl_task/ob_constraint_task.cpp @@ -837,7 +837,8 @@ int ObConstraintTask::cleanup_impl() } if (OB_SUCC(ret) && parent_task_id_ > 0) { - root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_); + const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_); + root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_); } return ret; } @@ -1811,8 +1812,8 @@ int ObConstraintTask::serialize_params_to_message(char *buf, const int64_t buf_l if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) { LOG_WARN("serialize table arg failed", K(ret)); } @@ -1826,8 +1827,8 @@ int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, co if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { @@ -1840,9 +1841,8 @@ int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, co int64_t ObConstraintTask::get_serialize_param_size() const { - return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_); + return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size(); } - void ObConstraintTask::flt_set_task_span_tag() const { FLT_SET_TAG(ddl_task_id, task_id_, ddl_parent_task_id, parent_task_id_, diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index ea87e46df..a00708142 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -42,6 +42,7 @@ public: const ObTableSchema &orig_table_schema, const AlterTableSchema &alter_table_schema, const ObTimeZoneInfoWrap &tz_info_wrap); + ObDDLTaskID get_ddl_task_id() { return ObDDLTaskID(tenant_id_, task_id_); } virtual ~ObDDLRedefinitionSSTableBuildTask() = default; virtual int process() override; virtual int64_t get_deep_copy_size() const override { return sizeof(*this); } diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index ed59a0cfe..f400e7a0b 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -606,8 +606,8 @@ int ObDDLRetryTask::serialize_params_to_message(char *buf, const int64_t buf_siz if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_size, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_size, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(ddl_arg_->serialize(buf, buf_size, pos))) { LOG_WARN("serialize table arg failed", K(ret)); } @@ -620,8 +620,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, cons if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size)); - } else if (OB_FAIL(serialization::decode_i64(buf, buf_size, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, buf_size, pos))) { + LOG_WARN("fail to deserialize ObDDLTask", K(ret)); } else if (ObDDLType::DDL_DROP_DATABASE == task_type_) { obrpc::ObDropDatabaseArg tmp_arg; if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) { @@ -667,7 +667,7 @@ int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, cons int64_t ObDDLRetryTask::get_serialize_param_size() const { - int64_t serialize_param_size = serialization::encoded_length_i64(task_version_); + int64_t serialize_param_size = ObDDLTask::get_serialize_param_size(); if (OB_NOT_NULL(ddl_arg_)) { serialize_param_size += ddl_arg_->get_serialize_size(); } diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 2f247f0d9..87cc6e0df 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -103,7 +103,7 @@ int ObDDLTaskQueue::push_task(ObDDLTask *task) } else { task_add_to_map = true; if (OB_FAIL(ret)) { - } else if (OB_FAIL(task_id_map_.set_refactored(task->get_task_id(), task, is_overwrite))) { + } else if (OB_FAIL(task_id_map_.set_refactored(task->get_ddl_task_id(), task, is_overwrite))) { if (common::OB_HASH_EXIST == ret) { ret = common::OB_ENTRY_EXIST; } else { @@ -160,7 +160,7 @@ int ObDDLTaskQueue::remove_task(ObDDLTask *task) } else { LOG_INFO("succ to remove task", K(*task), KP(task)); } - if (OB_SUCCESS != (tmp_ret = task_id_map_.erase_refactored(task->get_task_id()))) { + if (OB_SUCCESS != (tmp_ret = task_id_map_.erase_refactored(task->get_ddl_task_id()))) { LOG_WARN("erase task from map failed", K(ret)); ret = OB_SUCCESS == ret ? tmp_ret : ret; } @@ -209,7 +209,7 @@ int ObDDLTaskQueue::modify_task(const ObDDLTaskKey &task_key, F &&op) } template -int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op) +int ObDDLTaskQueue::modify_task(const ObDDLTaskID &task_id, F &&op) { int ret = OB_SUCCESS; common::ObSpinLockGuard guard(lock_); @@ -217,7 +217,7 @@ int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op) if (OB_UNLIKELY(!is_inited_)) { ret = common::OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); - } else if (OB_UNLIKELY(task_id < 0)) { + } else if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(task_id)); } else if (OB_FAIL(task_id_map_.get_refactored(task_id, task))) { @@ -232,7 +232,7 @@ int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op) return ret; } -int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id, +int ObDDLTaskQueue::update_task_copy_deps_setting(const ObDDLTaskID &task_id, const bool is_copy_constraints, const bool is_copy_indexes, const bool is_copy_triggers, @@ -246,7 +246,7 @@ int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id, if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); - } else if (OB_UNLIKELY(task_id <= 0)) { + } else if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(task_id)); } else if (OB_FAIL(task_id_map_.get_refactored(task_id, task))) { @@ -265,7 +265,7 @@ int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id, return ret; } -int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id) +int ObDDLTaskQueue::update_task_process_schedulable(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; ObDDLTask *ddl_task = nullptr; @@ -274,7 +274,7 @@ int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); - } else if (OB_UNLIKELY(task_id <= 0)) { + } else if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(task_id)); } else if (OB_FAIL(task_id_map_.get_refactored(task_id, ddl_task))) { @@ -289,7 +289,7 @@ int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id) return ret; } -int ObDDLTaskQueue::abort_task(const int64_t task_id) +int ObDDLTaskQueue::abort_task(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; share::ObTaskId trace_id; @@ -298,7 +298,7 @@ int ObDDLTaskQueue::abort_task(const int64_t task_id) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); - } else if (OB_UNLIKELY(task_id <= 0)) { + } else if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(task_id)); } else if (OB_FAIL(task_id_map_.get_refactored(task_id, ddl_task))) { @@ -308,6 +308,7 @@ int ObDDLTaskQueue::abort_task(const int64_t task_id) ret = OB_ERR_UNEXPECTED; LOG_WARN("ddl_task is null", K(ret)); } else { + ddl_task->set_is_abort(true); trace_id.set(ddl_task->get_trace_id()); if (OB_UNLIKELY(trace_id.is_invalid())) { ret = OB_INVALID_ARGUMENT; @@ -346,31 +347,37 @@ int ObDDLTaskHeartBeatMananger::init() return ret; } -int ObDDLTaskHeartBeatMananger::update_task_active_time(const int64_t task_id) +int ObDDLTaskHeartBeatMananger::update_task_active_time(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObManagerRegisterHeartBeatTask not inited", K(ret)); + } else if (OB_UNLIKELY(!task_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_INFO("invalid argument", K(ret), K(task_id)); } else { - ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id); + ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id.task_id_); // setting flag=1 to update the old time-value in the hash map with current time if (OB_FAIL(register_task_time_.set_refactored(task_id, ObTimeUtility::current_time(), 1, 0, 0))) { - LOG_WARN("set register task time failed", K(ret)); + LOG_WARN("set register task time failed", K(ret), K(task_id)); } } return ret; } -int ObDDLTaskHeartBeatMananger::remove_task(const int64_t task_id) +int ObDDLTaskHeartBeatMananger::remove_task(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObManagerRegisterHeartBeatTask not inited", K(ret)); + } else if (OB_UNLIKELY(!task_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_INFO("invalid argument", K(ret), K(task_id)); } else { - ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id); + ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id.task_id_); if (OB_FAIL(register_task_time_.erase_refactored(task_id))) { LOG_WARN("remove register task time failed", K(ret)); } @@ -378,7 +385,7 @@ int ObDDLTaskHeartBeatMananger::remove_task(const int64_t task_id) return ret; } -int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray& remove_task_ids) +int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray& remove_task_ids) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { @@ -392,7 +399,7 @@ int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray& remo ret = OB_SUCCESS; } } else { - for (common::hash::ObHashMap::iterator it = register_task_time_.begin(); OB_SUCC(ret) && it != register_task_time_.end(); it++) { + for (common::hash::ObHashMap::iterator it = register_task_time_.begin(); OB_SUCC(ret) && it != register_task_time_.end(); it++) { if (ObTimeUtility::current_time() - it->second > TIME_OUT_THRESHOLD) { if (OB_FAIL(remove_task_ids.push_back(it->first))) { LOG_WARN("remove_task_ids push_back task_id fail", K(ret), K(it->first)); @@ -876,26 +883,48 @@ int ObDDLScheduler::prepare_alter_table_arg(const ObPrepareAlterTableArgParam &p return ret; } -int ObDDLScheduler::abort_redef_table(const int64_t task_id) +template +int ObDDLScheduler::update_task_info(const ObDDLTaskID &task_id, + ObMySQLTransaction &trans, + ObDDLTaskRecord &task_record, + ObTableRedefinitionTask *ddl_task, + common::ObArenaAllocator &allocator, + F &&modify_info) { int ret = OB_SUCCESS; - share::ObTaskId trace_id; - ObDDLTask *ddl_task = nullptr; - if (OB_UNLIKELY(task_id <= 0)) { + if (OB_UNLIKELY(!task_id.is_valid() || !task_record.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", K(ret), K(task_id)); - } else if (OB_FAIL(task_queue_.abort_task(task_id))) { - LOG_WARN("abort redef table task failed", K(ret), K(task_id)); - if (OB_ENTRY_NOT_EXIST == ret) { - bool exist = false; - if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) { - LOG_WARN("check task id exist fail", K(ret), K(task_id)); - } else { - if (exist) { - ret = OB_EAGAIN; - LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id)); - } else { - LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id)); + LOG_WARN("task_id is invalid", K(ret), K(task_id), K(task_record)); + } else if (OB_FAIL(ddl_task->init(task_record))) { + LOG_WARN("ddl_task init failed", K(ret)); + } else if (OB_FAIL(ddl_task->set_trace_id(task_record.trace_id_))) { + LOG_WARN("set trace id failed", K(ret)); + } else if (OB_FAIL(ddl_task->convert_to_record(task_record, allocator))) { + LOG_WARN("convert to ddl task record failed", K(ret), KP(ddl_task)); + } else if (OB_UNLIKELY(!task_record.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ddl task record is invalid", K(ret), K(task_record)); + } else { + ObString message; + message.assign(task_record.message_.ptr(), task_record.message_.length()); + if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, task_id.tenant_id_, task_id.task_id_, message))) { + LOG_WARN("update task message failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_), K(message)); + } else { + if (OB_SUCC(ret)) { + if (OB_FAIL(modify_info())) { + if (OB_ENTRY_NOT_EXIST == ret) { + bool exist = false; + if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id.task_id_, exist))) { + LOG_WARN("check task id exist fail", K(ret), K(task_id)); + } else { + if (exist) { + ret = OB_EAGAIN; + LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id)); + } else { + LOG_WARN("this task does not exist int hash table", K(ret), K(task_id)); + } + } + } } } } @@ -903,8 +932,64 @@ int ObDDLScheduler::abort_redef_table(const int64_t task_id) return ret; } -int ObDDLScheduler::copy_table_dependents(const int64_t task_id, - const uint64_t tenant_id, +int ObDDLScheduler::abort_redef_table(const ObDDLTaskID &task_id) +{ + int ret = OB_SUCCESS; + share::ObTaskId trace_id; + ObDDLTask *ddl_task = nullptr; + int64_t table_task_status = 0; + int64_t table_execution_id = 0; + ObMySQLTransaction trans; + if (OB_UNLIKELY(!task_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(task_id)); + } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) { + LOG_WARN("start transaction failed", K(ret), K_(task_id.tenant_id)); + } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, + task_id.tenant_id_, + task_id.task_id_, + table_task_status, + table_execution_id))) { + LOG_WARN("select for update failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_)); + } else { + HEAP_VAR(ObTableRedefinitionTask, redefinition_task) { + ObDDLTaskRecord task_record; + common::ObArenaAllocator allocator(lib::ObLabel("abort_task")); + task_record.reset(); + if (OB_FAIL(task_queue_.modify_task(task_id, [&task_record, &allocator](ObDDLTask &task) -> int { + int ret = OB_SUCCESS; + ObTableRedefinitionTask *table_redefinition_task = static_cast(&task); + if (OB_UNLIKELY(!table_redefinition_task->is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("table rdefinition task is not valid", K(ret)); + } else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) { + LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task)); + } + return ret; + }))) { + LOG_WARN("failed to modify task", K(ret)); + } else { + redefinition_task.set_is_abort(true); + if (OB_FAIL(update_task_info(task_id, trans, task_record, + &redefinition_task, + allocator, + [&task_id, this]() -> int { return task_queue_.abort_task(task_id); }))) { + LOG_WARN("update task info failed", K(ret), K(task_id)); + } + } + } + } + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = trans.end(commit); + if (OB_SUCCESS != tmp_ret) { + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } + } + return ret; +} + +int ObDDLScheduler::copy_table_dependents(const ObDDLTaskID &task_id, const bool is_copy_constraints, const bool is_copy_indexes, const bool is_copy_triggers, @@ -915,20 +1000,18 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id, ObDDLTask *task = nullptr; int64_t table_task_status = 0; int64_t table_execution_id = 0; - int64_t pos = 0; - ObString message; ObMySQLTransaction trans; - if (OB_UNLIKELY(0 >= task_id || OB_INVALID_ID == tenant_id)) { + if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", K(ret), K(task_id), K(tenant_id)); - } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id))) { + LOG_WARN("invalid arg", K(ret), K(task_id)); + } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) { LOG_WARN("start transaction failed", K(ret)); } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, - tenant_id, - task_id, + task_id.tenant_id_, + task_id.task_id_, table_task_status, table_execution_id))) { - LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id)); + LOG_WARN("select for update failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_)); } else { HEAP_VAR(ObTableRedefinitionTask, redefinition_task) { ObDDLTaskRecord task_record; @@ -948,92 +1031,59 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id, LOG_WARN("failed to modify task", K(ret)); if (OB_ENTRY_NOT_EXIST == ret) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id, root_service_->get_sql_proxy(), allocator, task_record))) { - LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id)); + if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_, root_service_->get_sql_proxy(), allocator, task_record))) { + LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id.task_id_)); } else if (OB_TMP_FAIL(schedule_ddl_task(task_record))) { LOG_WARN("failed to schedule ddl task", K(tmp_ret), K(task_record)); } else { ret = OB_SUCCESS; } } - } - if (OB_FAIL(ret)) { - } else if (OB_FAIL(redefinition_task.init(task_record))) { - LOG_WARN("init table redefinition task failed", K(ret)); - } else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) { - LOG_WARN("set trace id failed", K(ret)); } else { redefinition_task.set_is_copy_constraints(is_copy_constraints); redefinition_task.set_is_copy_indexes(is_copy_indexes); redefinition_task.set_is_copy_triggers(is_copy_triggers); redefinition_task.set_is_copy_foreign_keys(is_copy_foreign_keys); redefinition_task.set_is_ignore_errors(is_ignore_errors); - if (OB_FAIL(redefinition_task.convert_to_record(task_record, allocator))) { - LOG_WARN("convert to ddl task record failed", K(ret), K(redefinition_task)); - } else if (OB_UNLIKELY(!task_record.is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ddl task record is invalid", K(ret), K(task_record)); - } else { - message.assign(task_record.message_.ptr(), task_record.message_.length()); - if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, tenant_id, task_id, message))) { - LOG_WARN("update task message failed", K(ret), K(tenant_id), K(task_id), K(message)); - } else { - bool commit = (OB_SUCCESS == ret); - int tmp_ret = trans.end(commit); - if (OB_SUCCESS != tmp_ret) { - ret = (OB_SUCCESS == ret) ? tmp_ret : ret; - } - if (OB_SUCC(ret)) { - if (OB_FAIL(task_queue_.update_task_copy_deps_setting(task_id, - is_copy_constraints, - is_copy_indexes, - is_copy_triggers, - is_copy_foreign_keys, - is_ignore_errors))) { - if (OB_ENTRY_NOT_EXIST == ret) { - bool exist = false; - if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) { - LOG_WARN("check task id exist fail", K(ret), K(task_id)); - } else { - if (exist) { - ret = OB_EAGAIN; - LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id)); - } else { - LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id)); - } - } - } - LOG_WARN("update task copy deps setting failed", K(ret), K(task_id)); - } - } - } + if (OB_FAIL(update_task_info(task_id, trans, task_record, + &redefinition_task, + allocator, + [&task_id, &is_copy_constraints, &is_copy_indexes, &is_copy_triggers, &is_copy_foreign_keys, + &is_ignore_errors, this]() -> int { return task_queue_.update_task_copy_deps_setting(task_id, + is_copy_constraints, is_copy_indexes, is_copy_triggers, is_copy_foreign_keys, is_ignore_errors); }))) { + LOG_WARN("update task info failed", K(ret), K(task_id)); } } } } + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = trans.end(commit); + if (OB_SUCCESS != tmp_ret) { + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } + } return ret; } -int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t tenant_id) +int ObDDLScheduler::finish_redef_table(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; ObDDLTask *task = nullptr; int64_t table_task_status = 0; int64_t table_execution_id = 0; - int64_t pos = 0; - ObString message; ObMySQLTransaction trans; - if (OB_UNLIKELY(0 >= task_id || OB_INVALID_ID == tenant_id)) { + if (OB_UNLIKELY(!task_id.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arg", K(ret), K(task_id), K(tenant_id)); - } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id))) { + LOG_WARN("invalid arg", K(ret), K(task_id)); + } else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) { LOG_WARN("start transaction failed", K(ret)); } else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, - tenant_id, - task_id, + task_id.tenant_id_, + task_id.task_id_, table_task_status, table_execution_id))) { - LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id)); + LOG_WARN("select for update failed", K(ret), K(task_id)); } else { HEAP_VAR(ObTableRedefinitionTask, redefinition_task) { ObDDLTaskRecord task_record; @@ -1054,58 +1104,32 @@ int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t ten if (OB_ENTRY_NOT_EXIST == ret) { int tmp_ret = OB_SUCCESS; ObSqlString sql_string; - if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id, root_service_->get_sql_proxy(), allocator, task_record))) { - LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id)); + if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_, root_service_->get_sql_proxy(), allocator, task_record))) { + LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id.task_id_)); } else if (OB_TMP_FAIL(schedule_ddl_task(task_record))) { LOG_WARN("failed to schedule ddl task", K(tmp_ret), K(task_record)); } else { ret = OB_SUCCESS; } } - } else if (OB_FAIL(redefinition_task.init(task_record))) { - LOG_WARN("init table redefinition task failed", K(ret)); - } else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) { - LOG_WARN("set trace id failed", K(ret)); } else { redefinition_task.set_is_do_finish(true); - if (OB_FAIL(redefinition_task.convert_to_record(task_record, allocator))) { - LOG_WARN("convert to ddl task record failed", K(ret), K(redefinition_task)); - } else if (OB_UNLIKELY(!task_record.is_valid())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("ddl task record is invalid", K(ret), K(task_record)); - } else { - message.assign(task_record.message_.ptr(), task_record.message_.length()); - if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, tenant_id, task_id, message))) { - LOG_WARN("update task message failed", K(ret), K(tenant_id), K(task_id), K(message)); - } else { - bool commit = (OB_SUCCESS == ret); - int tmp_ret = trans.end(commit); - if (OB_SUCCESS != tmp_ret) { - ret = (OB_SUCCESS == ret) ? tmp_ret : ret; - } - if (OB_SUCC(ret)) { - if (OB_FAIL(task_queue_.update_task_process_schedulable(task_id))) { - if (OB_ENTRY_NOT_EXIST == ret) { - bool exist = false; - if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) { - LOG_WARN("check task id exist fail", K(ret), K(task_id)); - } else { - if (exist) { - ret = OB_EAGAIN; - LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id)); - } else { - LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id)); - } - } - } - LOG_WARN("update task process schedulable failed", K(task_id)); - } - } - } + if (OB_FAIL(update_task_info(task_id, trans, task_record, + &redefinition_task, + allocator, + [&task_id, this]() -> int { return task_queue_.update_task_process_schedulable(task_id); }))) { + LOG_WARN("update task info failed", K(ret), K(task_id)); } } } } + if (trans.is_started()) { + bool commit = (OB_SUCCESS == ret); + int tmp_ret = trans.end(commit); + if (OB_SUCCESS != tmp_ret) { + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } + } return ret; } @@ -1590,7 +1614,7 @@ int ObDDLScheduler::recover_task() int ObDDLScheduler::remove_inactive_ddl_task() { int ret = OB_SUCCESS; - ObArray remove_task_ids; + ObArray remove_task_ids; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -1600,7 +1624,7 @@ int ObDDLScheduler::remove_inactive_ddl_task() } else { LOG_INFO("need remove task", K(remove_task_ids)); for (int64_t i = 0; OB_SUCC(ret) && i < remove_task_ids.size(); i++) { - int64_t remove_task_id = 0; + ObDDLTaskID remove_task_id; if (OB_FAIL(remove_task_ids.at(i, remove_task_id))) { LOG_WARN("get remove task id fail", K(ret)); } else if (OB_FAIL(abort_redef_table(remove_task_id))) { @@ -1759,9 +1783,8 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task if (OB_ENTRY_EXIST != ret) { LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task)); } - } else if ((ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_ - || ObDDLType::DDL_DIRECT_LOAD_INSERT == task_record.ddl_type_) - && OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(task_record.task_id_))) { + } else if (ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_ + && OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(ObDDLTaskID(task_record.tenant_id_, task_record.task_id_)))) { LOG_WARN("register_task_time recover fail", K(ret)); } if (OB_FAIL(ret) && nullptr != redefinition_task) { @@ -2096,7 +2119,7 @@ int ObDDLScheduler::on_sstable_complement_job_reply( } int ObDDLScheduler::on_ddl_task_finish( - const int64_t parent_task_id, + const ObDDLTaskID &parent_task_id, const ObDDLTaskKey &child_task_key, const int ret_code, const ObCurTraceId::TraceId &parent_task_trace_id) @@ -2105,7 +2128,7 @@ int ObDDLScheduler::on_ddl_task_finish( if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObDDLScheduler has not been inited", K(ret)); - } else if (OB_UNLIKELY(parent_task_id <= 0 || !child_task_key.is_valid())) { + } else if (OB_UNLIKELY(!parent_task_id.is_valid() || !child_task_key.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(parent_task_id), K(child_task_key)); } else { @@ -2185,13 +2208,13 @@ void ObDDLScheduler::destroy_all_tasks() } } -int ObDDLScheduler::update_ddl_task_active_time(const int64_t task_id) +int ObDDLScheduler::update_ddl_task_active_time(const ObDDLTaskID &task_id) { int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); - } else if (task_id <= 0) { + } else if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(task_id)); } else if (OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(task_id))) { diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index cd5a03030..d9420968a 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -62,22 +62,22 @@ public: template int modify_task(const ObDDLTaskKey &task_key, F &&op); template - int modify_task(const int64_t task_id, F &&op); - int update_task_copy_deps_setting(const int64_t task_id, + int modify_task(const ObDDLTaskID &task_id, F &&op); + int update_task_copy_deps_setting(const ObDDLTaskID &task_id, const bool is_copy_constraints, const bool is_copy_indexes, const bool is_copy_triggers, const bool is_copy_foreign_keys, const bool is_ignore_errors); - int update_task_process_schedulable(const int64_t task_id); - int abort_task(const int64_t task_id); + int update_task_process_schedulable(const ObDDLTaskID &task_id); + int abort_task(const ObDDLTaskID &task_id); int64_t get_task_cnt() const { return task_list_.get_size(); } void destroy(); private: typedef common::ObDList TaskList; typedef common::hash::ObHashMap TaskKeyMap; - typedef common::hash::ObHashMap TaskIdMap; TaskList task_list_; TaskKeyMap task_map_; @@ -92,12 +92,12 @@ public: ObDDLTaskHeartBeatMananger(); ~ObDDLTaskHeartBeatMananger(); int init(); - int update_task_active_time(const int64_t task_id); - int remove_task(const int64_t task_id); - int get_inactive_ddl_task_ids(ObArray& remove_task_ids); + int update_task_active_time(const ObDDLTaskID &task_id); + int remove_task(const ObDDLTaskID &task_id); + int get_inactive_ddl_task_ids(ObArray& remove_task_ids); private: static const int64_t BUCKET_LOCK_BUCKET_CNT = 10243L; - common::hash::ObHashMap register_task_time_; + common::hash::ObHashMap register_task_time_; bool is_inited_; common::ObBucketLock bucket_lock_; }; @@ -194,7 +194,7 @@ public: const ObDDLTaskInfo &addition_info); int on_ddl_task_finish( - const int64_t parent_task_id, + const ObDDLTaskID &parent_task_id, const ObDDLTaskKey &task_key, const int ret_code, const ObCurTraceId::TraceId &parent_task_trace_id); @@ -203,18 +203,25 @@ public: const ObDDLTaskKey &task_key, const uint64_t autoinc_val, const int ret_code); - int abort_redef_table(const int64_t task_id); + template + int update_task_info(const ObDDLTaskID &task_id, + ObMySQLTransaction &trans, + ObDDLTaskRecord &task_record, + ObTableRedefinitionTask *ddl_task, + common::ObArenaAllocator &allocator, + F &&modify_info); - int copy_table_dependents(const int64_t task_id, - const uint64_t tenant_id, + int abort_redef_table(const ObDDLTaskID &task_id); + + int copy_table_dependents(const ObDDLTaskID &task_id, const bool is_copy_constraints, const bool is_copy_indexes, const bool is_copy_triggers, const bool is_copy_foreign_keys, const bool is_ignore_errors); - int finish_redef_table(const int64_t task_id, const uint64_t tenant_id); + int finish_redef_table(const ObDDLTaskID &task_id); int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res); - int update_ddl_task_active_time(const int64_t task_id); + int update_ddl_task_active_time(const ObDDLTaskID &task_id); int prepare_alter_table_arg(const ObPrepareAlterTableArgParam ¶m, const ObTableSchema *target_table_schema, diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 3c3f47fc3..db8b40710 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -85,6 +85,65 @@ int ObDDLTaskKey::assign(const ObDDLTaskKey &other) return ret; } +ObDDLTaskID::ObDDLTaskID() + : tenant_id_(OB_INVALID_TENANT_ID), task_id_(0) +{ +} + +ObDDLTaskID::ObDDLTaskID(const uint64_t tenant_id, const int64_t task_id) + : tenant_id_(tenant_id), task_id_(task_id) +{ +} + +uint64_t ObDDLTaskID::hash() const +{ + uint64_t hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), 0); + hash_val = murmurhash(&task_id_, sizeof(task_id_), hash_val); + return hash_val; +} + +bool ObDDLTaskID::operator==(const ObDDLTaskID &other) const +{ + return tenant_id_ == other.tenant_id_ && task_id_ == other.task_id_; +} + +bool ObDDLTaskID::operator!=(const ObDDLTaskID &other) const +{ + return !(*this == other); +} + +int ObDDLTaskID::assign(const ObDDLTaskID &other) +{ + int ret = OB_SUCCESS; + if (!other.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(other)); + } else { + tenant_id_ = other.tenant_id_; + task_id_ = other.task_id_; + } + return ret; +} + +ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism, + const int64_t data_format_version, const bool is_abort) +{ + task_version_ = task_version; + parallelism_ = parallelism; + data_format_version_ = data_format_version; + is_abort_ = is_abort; +} + +void ObDDLTaskSerializeField::reset() +{ + task_version_ = 0; + parallelism_ = 0; + data_format_version_ = 0; + is_abort_ = false; +} + +OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField, task_version_, parallelism_, data_format_version_, is_abort_); + ObCreateDDLTaskParam::ObCreateDDLTaskParam() : tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), parent_task_id_(0), type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr) @@ -750,6 +809,44 @@ int ObDDLTask::set_ddl_stmt_str(const ObString &ddl_stmt_str) return ret; } +int ObDDLTask::serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const +{ + int ret = OB_SUCCESS; + ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_); + if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size)); + } else if (OB_FAIL(serialize_field.serialize(buf, buf_size, pos))) { + LOG_WARN("serialize_field serialize failed", K(ret)); + } + return ret; +} + +int ObDDLTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) +{ + int ret = OB_SUCCESS; + ObDDLTaskSerializeField serialize_field; + serialize_field.reset(); + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), KP(buf), K(tenant_id), K(buf_size)); + } else if (OB_FAIL(serialize_field.deserialize(buf, buf_size, pos))) { + LOG_WARN("serialize_field deserialize failed", K(ret)); + } else { + task_version_ = serialize_field.task_version_; + parallelism_ = serialize_field.parallelism_; + data_format_version_ = serialize_field.data_format_version_; + is_abort_ = serialize_field.is_abort_; + } + return ret; +} + +int64_t ObDDLTask::get_serialize_param_size() const +{ + ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_); + return serialize_field.get_serialize_size(); +} + int ObDDLTask::convert_to_record( ObDDLTaskRecord &task_record, common::ObIAllocator &allocator) @@ -829,8 +926,8 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable ObDDLTaskStatus real_new_status = new_status; const ObDDLTaskStatus old_status = task_status_; const bool error_need_retry = OB_SUCCESS != ret_code && is_error_need_retry(ret_code); - if (OB_TMP_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(trace_id_, is_cancel))) { - LOG_WARN("check task is canceled", K(tmp_ret), K(trace_id_)); + if (OB_TMP_FAIL(check_ddl_task_is_cancel(trace_id_, is_cancel))) { + LOG_WARN("check ddl task is cancel failed", K(tmp_ret), K_(trace_id)); } if (is_cancel) { real_ret_code = (OB_SUCCESS == ret_code || error_need_retry) ? OB_CANCELED : ret_code; @@ -1039,6 +1136,20 @@ int ObDDLTask::report_error_code(const ObString &forward_user_message, const int return ret; } +int ObDDLTask::check_ddl_task_is_cancel(const TraceId &trace_id, bool &is_cancel) +{ + int ret = OB_SUCCESS; + if (get_is_abort()) { + is_cancel = true; + } else if (trace_id.is_invalid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("trace id is invalid", K(ret), K(trace_id)); + } else if (OB_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(trace_id, is_cancel))) { + LOG_WARN("failed to check task is cancel", K(ret), K(trace_id)); + } + return ret; +} + // wait trans end, but not hold snapshot. int ObDDLTask::wait_trans_end( ObDDLWaitTransEndCtx &wait_trans_ctx, diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index fb534f188..14b99d82a 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -46,6 +46,23 @@ public: int64_t schema_version_; }; +struct ObDDLTaskID final +{ +public: + ObDDLTaskID(); + ObDDLTaskID(const uint64_t tenant_id, const int64_t task_id); + ~ObDDLTaskID() = default; + uint64_t hash() const; + bool operator==(const ObDDLTaskID &other) const; + bool operator!=(const ObDDLTaskID &other) const; + bool is_valid() const { return OB_INVALID_TENANT_ID != tenant_id_ && task_id_ > 0; } + int assign(const ObDDLTaskID &other); + TO_STRING_KV(K_(tenant_id), K_(task_id)); +public: + uint64_t tenant_id_; + int64_t task_id_; +}; + struct ObDDLTaskRecord final { public: @@ -87,6 +104,22 @@ public: int64_t row_inserted_; }; +struct ObDDLTaskSerializeField final +{ + OB_UNIS_VERSION(1); +public: + TO_STRING_KV(K_(task_version), K_(parallelism), K_(data_format_version), K_(is_abort)); + ObDDLTaskSerializeField() : task_version_(0), parallelism_(0), data_format_version_(0), is_abort_(false) {} + ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism, + const int64_t data_format_version, const bool is_abort); + ~ObDDLTaskSerializeField() = default; + void reset(); +public: + int64_t task_version_; + int64_t parallelism_; + int64_t data_format_version_; + bool is_abort_; +}; struct ObCreateDDLTaskParam final { public: @@ -384,7 +417,7 @@ class ObDDLTask : public common::ObDLinkBase { public: explicit ObDDLTask(const share::ObDDLType task_type) - : lock_(), ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false), + : lock_(), ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false), is_abort_(false), task_type_(task_type), trace_id_(), tenant_id_(0), object_id_(0), schema_version_(0), target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), @@ -402,6 +435,8 @@ public: share::ObDDLType get_task_type() const { return task_type_; } void set_not_running() { ATOMIC_SET(&is_running_, false); } void set_task_status(const share::ObDDLTaskStatus new_status) {task_status_ = new_status; } + void set_is_abort(const bool is_abort) { is_abort_ = is_abort; } + bool get_is_abort() { return is_abort_; } bool try_set_running() { return !ATOMIC_CAS(&is_running_, false, true); } uint64_t get_tenant_id() const { return tenant_id_; } uint64_t get_object_id() const { return object_id_; } @@ -412,6 +447,7 @@ public: int get_ddl_type_str(const int64_t ddl_type, const char *&ddl_type_str); int64_t get_ret_code() const { return ret_code_; } int64_t get_task_id() const { return task_id_; } + ObDDLTaskID get_ddl_task_id() const { return ObDDLTaskID(tenant_id_, task_id_); } ObDDLTaskKey get_task_key() const { return ObDDLTaskKey(target_object_id_, schema_version_); } int64_t get_parent_task_id() const { return parent_task_id_; } int64_t get_task_version() const { return task_version_; } @@ -423,9 +459,9 @@ public: share::ObDDLLongopsStat *get_longops_stat() const { return longops_stat_; } int64_t get_data_format_version() const { return data_format_version_; } static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id); - virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const = 0; - virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) = 0; - virtual int64_t get_serialize_param_size() const = 0; + virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const; + virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos); + virtual int64_t get_serialize_param_size() const; const ObString &get_ddl_stmt_str() const { return ddl_stmt_str_; } int set_ddl_stmt_str(const ObString &ddl_stmt_str); int convert_to_record(ObDDLTaskRecord &task_record, common::ObIAllocator &allocator); @@ -434,6 +470,7 @@ public: int refresh_schema_version(); int remove_task_record(); int report_error_code(const ObString &forward_user_message, const int64_t affected_rows = 0); + int check_ddl_task_is_cancel(const TraceId &trace_id, bool &is_cancel); int wait_trans_end( ObDDLWaitTransEndCtx &wait_trans_ctx, const share::ObDDLTaskStatus next_task_status); @@ -507,6 +544,7 @@ protected: bool is_inited_; bool need_retry_; bool is_running_; + bool is_abort_; share::ObDDLType task_type_; TraceId trace_id_; uint64_t tenant_id_; diff --git a/src/rootserver/ddl_task/ob_drop_index_task.cpp b/src/rootserver/ddl_task/ob_drop_index_task.cpp index 69dc83ccd..934cca3c3 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_index_task.cpp @@ -289,7 +289,8 @@ int ObDropIndexTask::cleanup_impl() } if (OB_SUCC(ret) && parent_task_id_ > 0) { - root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_); + const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_); + root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_); } LOG_INFO("clean task finished", K(ret), K(*this)); return ret; @@ -412,6 +413,8 @@ int ObDropIndexTask::serialize_params_to_message(char *buf, const int64_t buf_si if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), KP(buf), K(buf_size)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_size, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(drop_index_arg_.serialize(buf, buf_size, pos))) { LOG_WARN("serialize failed", K(ret)); } @@ -424,7 +427,9 @@ int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, con obrpc::ObDropIndexArg tmp_drop_index_arg; if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size)); + LOG_WARN("invalid arg", K(ret), K(tenant_id), KP(buf), K(buf_size)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, buf_size, pos))) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_drop_index_arg.deserialize(buf, buf_size, pos))) { LOG_WARN("deserialize failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_drop_index_arg))) { @@ -437,7 +442,7 @@ int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, con int64_t ObDropIndexTask::get_serialize_param_size() const { - return drop_index_arg_.get_serialize_size(); + return drop_index_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size(); } void ObDropIndexTask::flt_set_task_span_tag() const diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index f5d7813d1..1619ba57d 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -1356,7 +1356,8 @@ int ObIndexBuildTask::cleanup_impl() } if (OB_SUCC(ret) && parent_task_id_ > 0) { - root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_); + const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_); + root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_); } LOG_INFO("clean task finished", K(ret), K(*this)); return ret; @@ -1470,13 +1471,12 @@ int ObIndexBuildTask::serialize_params_to_message(char *buf, const int64_t buf_l if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(create_index_arg_.serialize(buf, buf_len, pos))) { LOG_WARN("serialize create index arg failed", K(ret)); } else { LST_DO_CODE(OB_UNIS_ENCODE, check_unique_snapshot_); - LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, data_format_version_); } return ret; } @@ -1488,8 +1488,8 @@ int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, co if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + } else if (ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos)) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("deserialize table failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { @@ -1498,7 +1498,6 @@ int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, co LOG_WARN("deep copy create index arg failed", K(ret)); } else { LST_DO_CODE(OB_UNIS_DECODE, check_unique_snapshot_); - LST_DO_CODE(OB_UNIS_DECODE, parallelism_, data_format_version_); } return ret; } @@ -1507,7 +1506,5 @@ int64_t ObIndexBuildTask::get_serialize_param_size() const { return create_index_arg_.get_serialize_size() + serialization::encoded_length_i64(check_unique_snapshot_) - + serialization::encoded_length_i64(task_version_) - + serialization::encoded_length_i64(parallelism_) - + serialization::encoded_length_i64(data_format_version_); + + ObDDLTask::get_serialize_param_size(); } diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index df5064ba4..c2fc3ee24 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -47,6 +47,7 @@ public: int set_nls_format(const ObString &nls_date_format, const ObString &nls_timestamp_format, const ObString &nls_timestamp_tz_format); + ObDDLTaskID get_ddl_task_id() { return ObDDLTaskID(tenant_id_, task_id_); } virtual int process() override; virtual int64_t get_deep_copy_size() const override { return sizeof(*this); } virtual ObAsyncTask *deep_copy(char *buf, const int64_t buf_size) const override; diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp index c51207d96..447fd3b75 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp @@ -591,8 +591,8 @@ int ObModifyAutoincTask::serialize_params_to_message(char *buf, const int64_t bu if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) { + LOG_WARN("fail to serialize ObDDLTask", K(ret)); } else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) { LOG_WARN("serialize table arg failed", K(ret)); } @@ -606,8 +606,8 @@ int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id, if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { @@ -620,7 +620,7 @@ int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id, int64_t ObModifyAutoincTask::get_serialize_param_size() const { - return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_); + return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size(); } void ObModifyAutoincTask::flt_set_task_span_tag() const diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 1ee1135ca..421313cfe 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -928,8 +928,7 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const int8_t copy_foreign_keys = static_cast(is_copy_foreign_keys_); int8_t ignore_errors = static_cast(is_ignore_errors_); int8_t do_finish = static_cast(is_do_finish_); - return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_) - + serialization::encoded_length_i64(parallelism_) + serialization::encoded_length_i64(data_format_version_) + return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size() + serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers) + serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys) + serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish); @@ -947,14 +946,10 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_ if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) { - LOG_WARN("fail to serialize task version", K(ret), K(task_version_)); + } else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) { + LOG_WARN("ObDDLTask serialize failed", K(ret)); } else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) { LOG_WARN("serialize table arg failed", K(ret)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, parallelism_))) { - LOG_WARN("fail to serialize parallelism_", K(ret)); - } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, data_format_version_))) { - LOG_WARN("fail to serialize parallelism_", K(ret)); } else if (OB_FAIL(serialization::encode_i8(buf, buf_len, pos, copy_indexes))) { LOG_WARN("fail to serialize is_copy_indexes", K(ret)); } else if (OB_FAIL(serialization::encode_i8(buf, buf_len, pos, copy_triggers))) { @@ -986,18 +981,14 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) { - LOG_WARN("fail to deserialize task version", K(ret)); + } else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) { + LOG_WARN("ObDDLTask deserlize failed", K(ret)); } else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) { LOG_WARN("serialize table failed", K(ret)); } else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) { LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg)); } else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) { LOG_WARN("deep copy table arg failed", K(ret)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, ¶llelism_))) { - LOG_WARN("fail to deserialize parallelism", K(ret)); - } else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &data_format_version_))) { - LOG_WARN("fail to deserialize data format version", K(ret)); } else if (pos < data_len) { if (OB_FAIL(serialization::decode_i8(buf, data_len, pos, ©_indexes))) { LOG_WARN("fail to deserialize is_copy_indexes_", K(ret)); diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index f8a170076..eed0211a4 100644 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -3910,8 +3910,8 @@ int ObRootService::update_ddl_task_active_time(const obrpc::ObUpdateDDLTaskActiv } else if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(arg)); - } else if (OB_FAIL(ddl_scheduler_.update_ddl_task_active_time(task_id))) { - LOG_WARN("fail to set RegTaskTime map", K(ret), K(task_id)); + } else if (OB_FAIL(ddl_scheduler_.update_ddl_task_active_time(ObDDLTaskID(tenant_id, task_id)))) { + LOG_WARN("fail to set RegTaskTime map", K(ret), K(tenant_id), K(task_id)); } return ret; } @@ -3934,8 +3934,8 @@ int ObRootService::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg) } else if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(arg)); - } else if (OB_FAIL(ddl_scheduler_.abort_redef_table(task_id))) { - LOG_WARN("cancel task failed", K(ret), K(task_id)); + } else if (OB_FAIL(ddl_scheduler_.abort_redef_table(ObDDLTaskID(tenant_id, task_id)))) { + LOG_WARN("cancel task failed", K(ret), K(tenant_id), K(task_id)); } return ret; } @@ -3958,7 +3958,7 @@ int ObRootService::finish_redef_table(const obrpc::ObFinishRedefTableArg &arg) } else if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(arg)); - } else if (OB_FAIL(ddl_scheduler_.finish_redef_table(task_id, tenant_id))) { + } else if (OB_FAIL(ddl_scheduler_.finish_redef_table(ObDDLTaskID(tenant_id, task_id)))) { LOG_WARN("failed to finish redef table", K(ret), K(task_id), K(tenant_id)); } return ret; @@ -3987,8 +3987,7 @@ int ObRootService::copy_table_dependents(const obrpc::ObCopyTableDependentsArg & } else if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arg", K(ret), K(arg)); - } else if (OB_FAIL(ddl_scheduler_.copy_table_dependents(task_id, - tenant_id, + } else if (OB_FAIL(ddl_scheduler_.copy_table_dependents(ObDDLTaskID(tenant_id, task_id), is_copy_constraints, is_copy_indexes, is_copy_triggers,