fix task aborted status is not persisted to the internal table bug

This commit is contained in:
obdev 2023-03-21 17:00:23 +00:00 committed by ob-robot
parent 1ec233bb5d
commit d760428489
14 changed files with 401 additions and 235 deletions

View File

@ -484,12 +484,10 @@ int ObColumnRedefinitionTask::serialize_params_to_message(char *buf, const int64
if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) {
LOG_WARN("serialize table arg failed", K(ret));
} else {
LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, data_format_version_);
LOG_WARN("alter_table_arg_ serialize failed", K(ret));
}
return ret;
}
@ -500,27 +498,22 @@ int ObColumnRedefinitionTask::deserlize_params_from_message(const uint64_t tenan
obrpc::ObAlterTableArg tmp_arg;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
LOG_WARN("invalid arguments", K(ret), KP(buf), K(tenant_id), K(data_len));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
} else {
LST_DO_CODE(OB_UNIS_DECODE, parallelism_, data_format_version_);
}
return ret;
}
int64_t ObColumnRedefinitionTask::get_serialize_param_size() const
{
return alter_table_arg_.get_serialize_size()
+ serialization::encoded_length_i64(task_version_)
+ serialization::encoded_length_i64(parallelism_)
+ serialization::encoded_length_i64(data_format_version_);
return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size();
}
int ObColumnRedefinitionTask::copy_table_dependent_objects(const ObDDLTaskStatus next_task_status)

View File

@ -837,7 +837,8 @@ int ObConstraintTask::cleanup_impl()
}
if (OB_SUCC(ret) && parent_task_id_ > 0) {
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_);
const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_);
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_);
}
return ret;
}
@ -1811,8 +1812,8 @@ int ObConstraintTask::serialize_params_to_message(char *buf, const int64_t buf_l
if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) {
LOG_WARN("serialize table arg failed", K(ret));
}
@ -1826,8 +1827,8 @@ int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, co
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
@ -1840,9 +1841,8 @@ int ObConstraintTask::deserlize_params_from_message(const uint64_t tenant_id, co
int64_t ObConstraintTask::get_serialize_param_size() const
{
return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_);
return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size();
}
void ObConstraintTask::flt_set_task_span_tag() const
{
FLT_SET_TAG(ddl_task_id, task_id_, ddl_parent_task_id, parent_task_id_,

View File

@ -42,6 +42,7 @@ public:
const ObTableSchema &orig_table_schema,
const AlterTableSchema &alter_table_schema,
const ObTimeZoneInfoWrap &tz_info_wrap);
ObDDLTaskID get_ddl_task_id() { return ObDDLTaskID(tenant_id_, task_id_); }
virtual ~ObDDLRedefinitionSSTableBuildTask() = default;
virtual int process() override;
virtual int64_t get_deep_copy_size() const override { return sizeof(*this); }

View File

@ -606,8 +606,8 @@ int ObDDLRetryTask::serialize_params_to_message(char *buf, const int64_t buf_siz
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_size, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_size, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(ddl_arg_->serialize(buf, buf_size, pos))) {
LOG_WARN("serialize table arg failed", K(ret));
}
@ -620,8 +620,8 @@ int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, cons
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size));
} else if (OB_FAIL(serialization::decode_i64(buf, buf_size, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, buf_size, pos))) {
LOG_WARN("fail to deserialize ObDDLTask", K(ret));
} else if (ObDDLType::DDL_DROP_DATABASE == task_type_) {
obrpc::ObDropDatabaseArg tmp_arg;
if (OB_FAIL(tmp_arg.deserialize(buf, buf_size, pos))) {
@ -667,7 +667,7 @@ int ObDDLRetryTask::deserlize_params_from_message(const uint64_t tenant_id, cons
int64_t ObDDLRetryTask::get_serialize_param_size() const
{
int64_t serialize_param_size = serialization::encoded_length_i64(task_version_);
int64_t serialize_param_size = ObDDLTask::get_serialize_param_size();
if (OB_NOT_NULL(ddl_arg_)) {
serialize_param_size += ddl_arg_->get_serialize_size();
}

View File

@ -103,7 +103,7 @@ int ObDDLTaskQueue::push_task(ObDDLTask *task)
} else {
task_add_to_map = true;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(task_id_map_.set_refactored(task->get_task_id(), task, is_overwrite))) {
} else if (OB_FAIL(task_id_map_.set_refactored(task->get_ddl_task_id(), task, is_overwrite))) {
if (common::OB_HASH_EXIST == ret) {
ret = common::OB_ENTRY_EXIST;
} else {
@ -160,7 +160,7 @@ int ObDDLTaskQueue::remove_task(ObDDLTask *task)
} else {
LOG_INFO("succ to remove task", K(*task), KP(task));
}
if (OB_SUCCESS != (tmp_ret = task_id_map_.erase_refactored(task->get_task_id()))) {
if (OB_SUCCESS != (tmp_ret = task_id_map_.erase_refactored(task->get_ddl_task_id()))) {
LOG_WARN("erase task from map failed", K(ret));
ret = OB_SUCCESS == ret ? tmp_ret : ret;
}
@ -209,7 +209,7 @@ int ObDDLTaskQueue::modify_task(const ObDDLTaskKey &task_key, F &&op)
}
template<typename F>
int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op)
int ObDDLTaskQueue::modify_task(const ObDDLTaskID &task_id, F &&op)
{
int ret = OB_SUCCESS;
common::ObSpinLockGuard guard(lock_);
@ -217,7 +217,7 @@ int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op)
if (OB_UNLIKELY(!is_inited_)) {
ret = common::OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(task_id < 0)) {
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(task_id));
} else if (OB_FAIL(task_id_map_.get_refactored(task_id, task))) {
@ -232,7 +232,7 @@ int ObDDLTaskQueue::modify_task(const int64_t task_id, F &&op)
return ret;
}
int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id,
int ObDDLTaskQueue::update_task_copy_deps_setting(const ObDDLTaskID &task_id,
const bool is_copy_constraints,
const bool is_copy_indexes,
const bool is_copy_triggers,
@ -246,7 +246,7 @@ int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id,
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(task_id <= 0)) {
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(task_id_map_.get_refactored(task_id, task))) {
@ -265,7 +265,7 @@ int ObDDLTaskQueue::update_task_copy_deps_setting(const int64_t task_id,
return ret;
}
int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id)
int ObDDLTaskQueue::update_task_process_schedulable(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
ObDDLTask *ddl_task = nullptr;
@ -274,7 +274,7 @@ int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(task_id <= 0)) {
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(task_id_map_.get_refactored(task_id, ddl_task))) {
@ -289,7 +289,7 @@ int ObDDLTaskQueue::update_task_process_schedulable(const int64_t task_id)
return ret;
}
int ObDDLTaskQueue::abort_task(const int64_t task_id)
int ObDDLTaskQueue::abort_task(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
share::ObTaskId trace_id;
@ -298,7 +298,7 @@ int ObDDLTaskQueue::abort_task(const int64_t task_id)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(task_id <= 0)) {
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(task_id_map_.get_refactored(task_id, ddl_task))) {
@ -308,6 +308,7 @@ int ObDDLTaskQueue::abort_task(const int64_t task_id)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl_task is null", K(ret));
} else {
ddl_task->set_is_abort(true);
trace_id.set(ddl_task->get_trace_id());
if (OB_UNLIKELY(trace_id.is_invalid())) {
ret = OB_INVALID_ARGUMENT;
@ -346,31 +347,37 @@ int ObDDLTaskHeartBeatMananger::init()
return ret;
}
int ObDDLTaskHeartBeatMananger::update_task_active_time(const int64_t task_id)
int ObDDLTaskHeartBeatMananger::update_task_active_time(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObManagerRegisterHeartBeatTask not inited", K(ret));
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_INFO("invalid argument", K(ret), K(task_id));
} else {
ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id);
ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id.task_id_);
// setting flag=1 to update the old time-value in the hash map with current time
if (OB_FAIL(register_task_time_.set_refactored(task_id,
ObTimeUtility::current_time(), 1, 0, 0))) {
LOG_WARN("set register task time failed", K(ret));
LOG_WARN("set register task time failed", K(ret), K(task_id));
}
}
return ret;
}
int ObDDLTaskHeartBeatMananger::remove_task(const int64_t task_id)
int ObDDLTaskHeartBeatMananger::remove_task(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObManagerRegisterHeartBeatTask not inited", K(ret));
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_INFO("invalid argument", K(ret), K(task_id));
} else {
ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id);
ObBucketHashWLockGuard lock_guard(bucket_lock_, task_id.task_id_);
if (OB_FAIL(register_task_time_.erase_refactored(task_id))) {
LOG_WARN("remove register task time failed", K(ret));
}
@ -378,7 +385,7 @@ int ObDDLTaskHeartBeatMananger::remove_task(const int64_t task_id)
return ret;
}
int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray<int64_t>& remove_task_ids)
int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray<ObDDLTaskID>& remove_task_ids)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
@ -392,7 +399,7 @@ int ObDDLTaskHeartBeatMananger::get_inactive_ddl_task_ids(ObArray<int64_t>& remo
ret = OB_SUCCESS;
}
} else {
for (common::hash::ObHashMap<int64_t, int64_t>::iterator it = register_task_time_.begin(); OB_SUCC(ret) && it != register_task_time_.end(); it++) {
for (common::hash::ObHashMap<ObDDLTaskID, int64_t>::iterator it = register_task_time_.begin(); OB_SUCC(ret) && it != register_task_time_.end(); it++) {
if (ObTimeUtility::current_time() - it->second > TIME_OUT_THRESHOLD) {
if (OB_FAIL(remove_task_ids.push_back(it->first))) {
LOG_WARN("remove_task_ids push_back task_id fail", K(ret), K(it->first));
@ -876,26 +883,48 @@ int ObDDLScheduler::prepare_alter_table_arg(const ObPrepareAlterTableArgParam &p
return ret;
}
int ObDDLScheduler::abort_redef_table(const int64_t task_id)
template<typename F>
int ObDDLScheduler::update_task_info(const ObDDLTaskID &task_id,
ObMySQLTransaction &trans,
ObDDLTaskRecord &task_record,
ObTableRedefinitionTask *ddl_task,
common::ObArenaAllocator &allocator,
F &&modify_info)
{
int ret = OB_SUCCESS;
share::ObTaskId trace_id;
ObDDLTask *ddl_task = nullptr;
if (OB_UNLIKELY(task_id <= 0)) {
if (OB_UNLIKELY(!task_id.is_valid() || !task_record.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(task_queue_.abort_task(task_id))) {
LOG_WARN("abort redef table task failed", K(ret), K(task_id));
if (OB_ENTRY_NOT_EXIST == ret) {
bool exist = false;
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) {
LOG_WARN("check task id exist fail", K(ret), K(task_id));
} else {
if (exist) {
ret = OB_EAGAIN;
LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id));
} else {
LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id));
LOG_WARN("task_id is invalid", K(ret), K(task_id), K(task_record));
} else if (OB_FAIL(ddl_task->init(task_record))) {
LOG_WARN("ddl_task init failed", K(ret));
} else if (OB_FAIL(ddl_task->set_trace_id(task_record.trace_id_))) {
LOG_WARN("set trace id failed", K(ret));
} else if (OB_FAIL(ddl_task->convert_to_record(task_record, allocator))) {
LOG_WARN("convert to ddl task record failed", K(ret), KP(ddl_task));
} else if (OB_UNLIKELY(!task_record.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl task record is invalid", K(ret), K(task_record));
} else {
ObString message;
message.assign(task_record.message_.ptr(), task_record.message_.length());
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, task_id.tenant_id_, task_id.task_id_, message))) {
LOG_WARN("update task message failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_), K(message));
} else {
if (OB_SUCC(ret)) {
if (OB_FAIL(modify_info())) {
if (OB_ENTRY_NOT_EXIST == ret) {
bool exist = false;
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id.task_id_, exist))) {
LOG_WARN("check task id exist fail", K(ret), K(task_id));
} else {
if (exist) {
ret = OB_EAGAIN;
LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id));
} else {
LOG_WARN("this task does not exist int hash table", K(ret), K(task_id));
}
}
}
}
}
}
@ -903,8 +932,64 @@ int ObDDLScheduler::abort_redef_table(const int64_t task_id)
return ret;
}
int ObDDLScheduler::copy_table_dependents(const int64_t task_id,
const uint64_t tenant_id,
int ObDDLScheduler::abort_redef_table(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
share::ObTaskId trace_id;
ObDDLTask *ddl_task = nullptr;
int64_t table_task_status = 0;
int64_t table_execution_id = 0;
ObMySQLTransaction trans;
if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) {
LOG_WARN("start transaction failed", K(ret), K_(task_id.tenant_id));
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans,
task_id.tenant_id_,
task_id.task_id_,
table_task_status,
table_execution_id))) {
LOG_WARN("select for update failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_));
} else {
HEAP_VAR(ObTableRedefinitionTask, redefinition_task) {
ObDDLTaskRecord task_record;
common::ObArenaAllocator allocator(lib::ObLabel("abort_task"));
task_record.reset();
if (OB_FAIL(task_queue_.modify_task(task_id, [&task_record, &allocator](ObDDLTask &task) -> int {
int ret = OB_SUCCESS;
ObTableRedefinitionTask *table_redefinition_task = static_cast<ObTableRedefinitionTask*>(&task);
if (OB_UNLIKELY(!table_redefinition_task->is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("table rdefinition task is not valid", K(ret));
} else if (OB_FAIL(table_redefinition_task->convert_to_record(task_record, allocator))) {
LOG_WARN("convert to ddl task record failed", K(ret), K(*table_redefinition_task));
}
return ret;
}))) {
LOG_WARN("failed to modify task", K(ret));
} else {
redefinition_task.set_is_abort(true);
if (OB_FAIL(update_task_info(task_id, trans, task_record,
&redefinition_task,
allocator,
[&task_id, this]() -> int { return task_queue_.abort_task(task_id); }))) {
LOG_WARN("update task info failed", K(ret), K(task_id));
}
}
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(commit);
if (OB_SUCCESS != tmp_ret) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
}
return ret;
}
int ObDDLScheduler::copy_table_dependents(const ObDDLTaskID &task_id,
const bool is_copy_constraints,
const bool is_copy_indexes,
const bool is_copy_triggers,
@ -915,20 +1000,18 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id,
ObDDLTask *task = nullptr;
int64_t table_task_status = 0;
int64_t table_execution_id = 0;
int64_t pos = 0;
ObString message;
ObMySQLTransaction trans;
if (OB_UNLIKELY(0 >= task_id || OB_INVALID_ID == tenant_id)) {
if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id), K(tenant_id));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id))) {
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) {
LOG_WARN("start transaction failed", K(ret));
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans,
tenant_id,
task_id,
task_id.tenant_id_,
task_id.task_id_,
table_task_status,
table_execution_id))) {
LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id));
LOG_WARN("select for update failed", K(ret), K(task_id.tenant_id_), K(task_id.task_id_));
} else {
HEAP_VAR(ObTableRedefinitionTask, redefinition_task) {
ObDDLTaskRecord task_record;
@ -948,92 +1031,59 @@ int ObDDLScheduler::copy_table_dependents(const int64_t task_id,
LOG_WARN("failed to modify task", K(ret));
if (OB_ENTRY_NOT_EXIST == ret) {
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id, root_service_->get_sql_proxy(), allocator, task_record))) {
LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id));
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_, root_service_->get_sql_proxy(), allocator, task_record))) {
LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id.task_id_));
} else if (OB_TMP_FAIL(schedule_ddl_task(task_record))) {
LOG_WARN("failed to schedule ddl task", K(tmp_ret), K(task_record));
} else {
ret = OB_SUCCESS;
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(redefinition_task.init(task_record))) {
LOG_WARN("init table redefinition task failed", K(ret));
} else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) {
LOG_WARN("set trace id failed", K(ret));
} else {
redefinition_task.set_is_copy_constraints(is_copy_constraints);
redefinition_task.set_is_copy_indexes(is_copy_indexes);
redefinition_task.set_is_copy_triggers(is_copy_triggers);
redefinition_task.set_is_copy_foreign_keys(is_copy_foreign_keys);
redefinition_task.set_is_ignore_errors(is_ignore_errors);
if (OB_FAIL(redefinition_task.convert_to_record(task_record, allocator))) {
LOG_WARN("convert to ddl task record failed", K(ret), K(redefinition_task));
} else if (OB_UNLIKELY(!task_record.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl task record is invalid", K(ret), K(task_record));
} else {
message.assign(task_record.message_.ptr(), task_record.message_.length());
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, tenant_id, task_id, message))) {
LOG_WARN("update task message failed", K(ret), K(tenant_id), K(task_id), K(message));
} else {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(commit);
if (OB_SUCCESS != tmp_ret) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(task_queue_.update_task_copy_deps_setting(task_id,
is_copy_constraints,
is_copy_indexes,
is_copy_triggers,
is_copy_foreign_keys,
is_ignore_errors))) {
if (OB_ENTRY_NOT_EXIST == ret) {
bool exist = false;
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) {
LOG_WARN("check task id exist fail", K(ret), K(task_id));
} else {
if (exist) {
ret = OB_EAGAIN;
LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id));
} else {
LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id));
}
}
}
LOG_WARN("update task copy deps setting failed", K(ret), K(task_id));
}
}
}
if (OB_FAIL(update_task_info(task_id, trans, task_record,
&redefinition_task,
allocator,
[&task_id, &is_copy_constraints, &is_copy_indexes, &is_copy_triggers, &is_copy_foreign_keys,
&is_ignore_errors, this]() -> int { return task_queue_.update_task_copy_deps_setting(task_id,
is_copy_constraints, is_copy_indexes, is_copy_triggers, is_copy_foreign_keys, is_ignore_errors); }))) {
LOG_WARN("update task info failed", K(ret), K(task_id));
}
}
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(commit);
if (OB_SUCCESS != tmp_ret) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
}
return ret;
}
int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t tenant_id)
int ObDDLScheduler::finish_redef_table(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
ObDDLTask *task = nullptr;
int64_t table_task_status = 0;
int64_t table_execution_id = 0;
int64_t pos = 0;
ObString message;
ObMySQLTransaction trans;
if (OB_UNLIKELY(0 >= task_id || OB_INVALID_ID == tenant_id)) {
if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id), K(tenant_id));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), tenant_id))) {
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(trans.start(&root_service_->get_sql_proxy(), task_id.tenant_id_))) {
LOG_WARN("start transaction failed", K(ret));
} else if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans,
tenant_id,
task_id,
task_id.tenant_id_,
task_id.task_id_,
table_task_status,
table_execution_id))) {
LOG_WARN("select for update failed", K(ret), K(tenant_id), K(task_id));
LOG_WARN("select for update failed", K(ret), K(task_id));
} else {
HEAP_VAR(ObTableRedefinitionTask, redefinition_task) {
ObDDLTaskRecord task_record;
@ -1054,58 +1104,32 @@ int ObDDLScheduler::finish_redef_table(const int64_t task_id, const uint64_t ten
if (OB_ENTRY_NOT_EXIST == ret) {
int tmp_ret = OB_SUCCESS;
ObSqlString sql_string;
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id, root_service_->get_sql_proxy(), allocator, task_record))) {
LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id));
if (OB_TMP_FAIL(ObDDLTaskRecordOperator::get_ddl_task_record(task_id.task_id_, root_service_->get_sql_proxy(), allocator, task_record))) {
LOG_WARN("get single ddl task failed", K(tmp_ret), K(task_id.task_id_));
} else if (OB_TMP_FAIL(schedule_ddl_task(task_record))) {
LOG_WARN("failed to schedule ddl task", K(tmp_ret), K(task_record));
} else {
ret = OB_SUCCESS;
}
}
} else if (OB_FAIL(redefinition_task.init(task_record))) {
LOG_WARN("init table redefinition task failed", K(ret));
} else if (OB_FAIL(redefinition_task.set_trace_id(task_record.trace_id_))) {
LOG_WARN("set trace id failed", K(ret));
} else {
redefinition_task.set_is_do_finish(true);
if (OB_FAIL(redefinition_task.convert_to_record(task_record, allocator))) {
LOG_WARN("convert to ddl task record failed", K(ret), K(redefinition_task));
} else if (OB_UNLIKELY(!task_record.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl task record is invalid", K(ret), K(task_record));
} else {
message.assign(task_record.message_.ptr(), task_record.message_.length());
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, tenant_id, task_id, message))) {
LOG_WARN("update task message failed", K(ret), K(tenant_id), K(task_id), K(message));
} else {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(commit);
if (OB_SUCCESS != tmp_ret) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(task_queue_.update_task_process_schedulable(task_id))) {
if (OB_ENTRY_NOT_EXIST == ret) {
bool exist = false;
if (OB_FAIL(ObDDLTaskRecordOperator::check_task_id_exist(root_service_->get_sql_proxy(), task_id, exist))) {
LOG_WARN("check task id exist fail", K(ret), K(task_id));
} else {
if (exist) {
ret = OB_EAGAIN;
LOG_INFO("entry exist, the ddl scheduler hasn't recovered the task yet", K(ret), K(task_id));
} else {
LOG_WARN("this task does not exist in the hash table", K(ret), K(task_id));
}
}
}
LOG_WARN("update task process schedulable failed", K(task_id));
}
}
}
if (OB_FAIL(update_task_info(task_id, trans, task_record,
&redefinition_task,
allocator,
[&task_id, this]() -> int { return task_queue_.update_task_process_schedulable(task_id); }))) {
LOG_WARN("update task info failed", K(ret), K(task_id));
}
}
}
}
if (trans.is_started()) {
bool commit = (OB_SUCCESS == ret);
int tmp_ret = trans.end(commit);
if (OB_SUCCESS != tmp_ret) {
ret = (OB_SUCCESS == ret) ? tmp_ret : ret;
}
}
return ret;
}
@ -1590,7 +1614,7 @@ int ObDDLScheduler::recover_task()
int ObDDLScheduler::remove_inactive_ddl_task()
{
int ret = OB_SUCCESS;
ObArray<int64_t> remove_task_ids;
ObArray<ObDDLTaskID> remove_task_ids;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -1600,7 +1624,7 @@ int ObDDLScheduler::remove_inactive_ddl_task()
} else {
LOG_INFO("need remove task", K(remove_task_ids));
for (int64_t i = 0; OB_SUCC(ret) && i < remove_task_ids.size(); i++) {
int64_t remove_task_id = 0;
ObDDLTaskID remove_task_id;
if (OB_FAIL(remove_task_ids.at(i, remove_task_id))) {
LOG_WARN("get remove task id fail", K(ret));
} else if (OB_FAIL(abort_redef_table(remove_task_id))) {
@ -1759,9 +1783,8 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task
if (OB_ENTRY_EXIST != ret) {
LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task));
}
} else if ((ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_
|| ObDDLType::DDL_DIRECT_LOAD_INSERT == task_record.ddl_type_)
&& OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(task_record.task_id_))) {
} else if (ObDDLType::DDL_DIRECT_LOAD == task_record.ddl_type_
&& OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(ObDDLTaskID(task_record.tenant_id_, task_record.task_id_)))) {
LOG_WARN("register_task_time recover fail", K(ret));
}
if (OB_FAIL(ret) && nullptr != redefinition_task) {
@ -2096,7 +2119,7 @@ int ObDDLScheduler::on_sstable_complement_job_reply(
}
int ObDDLScheduler::on_ddl_task_finish(
const int64_t parent_task_id,
const ObDDLTaskID &parent_task_id,
const ObDDLTaskKey &child_task_key,
const int ret_code,
const ObCurTraceId::TraceId &parent_task_trace_id)
@ -2105,7 +2128,7 @@ int ObDDLScheduler::on_ddl_task_finish(
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLScheduler has not been inited", K(ret));
} else if (OB_UNLIKELY(parent_task_id <= 0 || !child_task_key.is_valid())) {
} else if (OB_UNLIKELY(!parent_task_id.is_valid() || !child_task_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(parent_task_id), K(child_task_key));
} else {
@ -2185,13 +2208,13 @@ void ObDDLScheduler::destroy_all_tasks()
}
}
int ObDDLScheduler::update_ddl_task_active_time(const int64_t task_id)
int ObDDLScheduler::update_ddl_task_active_time(const ObDDLTaskID &task_id)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret));
} else if (task_id <= 0) {
} else if (!task_id.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(manager_reg_heart_beat_task_.update_task_active_time(task_id))) {

View File

@ -62,22 +62,22 @@ public:
template<typename F>
int modify_task(const ObDDLTaskKey &task_key, F &&op);
template<typename F>
int modify_task(const int64_t task_id, F &&op);
int update_task_copy_deps_setting(const int64_t task_id,
int modify_task(const ObDDLTaskID &task_id, F &&op);
int update_task_copy_deps_setting(const ObDDLTaskID &task_id,
const bool is_copy_constraints,
const bool is_copy_indexes,
const bool is_copy_triggers,
const bool is_copy_foreign_keys,
const bool is_ignore_errors);
int update_task_process_schedulable(const int64_t task_id);
int abort_task(const int64_t task_id);
int update_task_process_schedulable(const ObDDLTaskID &task_id);
int abort_task(const ObDDLTaskID &task_id);
int64_t get_task_cnt() const { return task_list_.get_size(); }
void destroy();
private:
typedef common::ObDList<ObDDLTask> TaskList;
typedef common::hash::ObHashMap<ObDDLTaskKey, ObDDLTask *,
common::hash::NoPthreadDefendMode> TaskKeyMap;
typedef common::hash::ObHashMap<int64_t, ObDDLTask *,
typedef common::hash::ObHashMap<ObDDLTaskID, ObDDLTask *,
common::hash::NoPthreadDefendMode> TaskIdMap;
TaskList task_list_;
TaskKeyMap task_map_;
@ -92,12 +92,12 @@ public:
ObDDLTaskHeartBeatMananger();
~ObDDLTaskHeartBeatMananger();
int init();
int update_task_active_time(const int64_t task_id);
int remove_task(const int64_t task_id);
int get_inactive_ddl_task_ids(ObArray<int64_t>& remove_task_ids);
int update_task_active_time(const ObDDLTaskID &task_id);
int remove_task(const ObDDLTaskID &task_id);
int get_inactive_ddl_task_ids(ObArray<ObDDLTaskID>& remove_task_ids);
private:
static const int64_t BUCKET_LOCK_BUCKET_CNT = 10243L;
common::hash::ObHashMap<int64_t, int64_t> register_task_time_;
common::hash::ObHashMap<ObDDLTaskID, int64_t> register_task_time_;
bool is_inited_;
common::ObBucketLock bucket_lock_;
};
@ -194,7 +194,7 @@ public:
const ObDDLTaskInfo &addition_info);
int on_ddl_task_finish(
const int64_t parent_task_id,
const ObDDLTaskID &parent_task_id,
const ObDDLTaskKey &task_key,
const int ret_code,
const ObCurTraceId::TraceId &parent_task_trace_id);
@ -203,18 +203,25 @@ public:
const ObDDLTaskKey &task_key,
const uint64_t autoinc_val,
const int ret_code);
int abort_redef_table(const int64_t task_id);
template<typename F>
int update_task_info(const ObDDLTaskID &task_id,
ObMySQLTransaction &trans,
ObDDLTaskRecord &task_record,
ObTableRedefinitionTask *ddl_task,
common::ObArenaAllocator &allocator,
F &&modify_info);
int copy_table_dependents(const int64_t task_id,
const uint64_t tenant_id,
int abort_redef_table(const ObDDLTaskID &task_id);
int copy_table_dependents(const ObDDLTaskID &task_id,
const bool is_copy_constraints,
const bool is_copy_indexes,
const bool is_copy_triggers,
const bool is_copy_foreign_keys,
const bool is_ignore_errors);
int finish_redef_table(const int64_t task_id, const uint64_t tenant_id);
int finish_redef_table(const ObDDLTaskID &task_id);
int start_redef_table(const obrpc::ObStartRedefTableArg &arg, obrpc::ObStartRedefTableRes &res);
int update_ddl_task_active_time(const int64_t task_id);
int update_ddl_task_active_time(const ObDDLTaskID &task_id);
int prepare_alter_table_arg(const ObPrepareAlterTableArgParam &param,
const ObTableSchema *target_table_schema,

View File

@ -85,6 +85,65 @@ int ObDDLTaskKey::assign(const ObDDLTaskKey &other)
return ret;
}
ObDDLTaskID::ObDDLTaskID()
: tenant_id_(OB_INVALID_TENANT_ID), task_id_(0)
{
}
ObDDLTaskID::ObDDLTaskID(const uint64_t tenant_id, const int64_t task_id)
: tenant_id_(tenant_id), task_id_(task_id)
{
}
uint64_t ObDDLTaskID::hash() const
{
uint64_t hash_val = murmurhash(&tenant_id_, sizeof(tenant_id_), 0);
hash_val = murmurhash(&task_id_, sizeof(task_id_), hash_val);
return hash_val;
}
bool ObDDLTaskID::operator==(const ObDDLTaskID &other) const
{
return tenant_id_ == other.tenant_id_ && task_id_ == other.task_id_;
}
bool ObDDLTaskID::operator!=(const ObDDLTaskID &other) const
{
return !(*this == other);
}
int ObDDLTaskID::assign(const ObDDLTaskID &other)
{
int ret = OB_SUCCESS;
if (!other.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(other));
} else {
tenant_id_ = other.tenant_id_;
task_id_ = other.task_id_;
}
return ret;
}
ObDDLTaskSerializeField::ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism,
const int64_t data_format_version, const bool is_abort)
{
task_version_ = task_version;
parallelism_ = parallelism;
data_format_version_ = data_format_version;
is_abort_ = is_abort;
}
void ObDDLTaskSerializeField::reset()
{
task_version_ = 0;
parallelism_ = 0;
data_format_version_ = 0;
is_abort_ = false;
}
OB_SERIALIZE_MEMBER(ObDDLTaskSerializeField, task_version_, parallelism_, data_format_version_, is_abort_);
ObCreateDDLTaskParam::ObCreateDDLTaskParam()
: tenant_id_(OB_INVALID_ID), object_id_(OB_INVALID_ID), schema_version_(0), parallelism_(0), parent_task_id_(0),
type_(DDL_INVALID), src_table_schema_(nullptr), dest_table_schema_(nullptr), ddl_arg_(nullptr), allocator_(nullptr)
@ -750,6 +809,44 @@ int ObDDLTask::set_ddl_stmt_str(const ObString &ddl_stmt_str)
return ret;
}
int ObDDLTask::serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const
{
int ret = OB_SUCCESS;
ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_);
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_size));
} else if (OB_FAIL(serialize_field.serialize(buf, buf_size, pos))) {
LOG_WARN("serialize_field serialize failed", K(ret));
}
return ret;
}
int ObDDLTask::deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos)
{
int ret = OB_SUCCESS;
ObDDLTaskSerializeField serialize_field;
serialize_field.reset();
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(tenant_id), K(buf_size));
} else if (OB_FAIL(serialize_field.deserialize(buf, buf_size, pos))) {
LOG_WARN("serialize_field deserialize failed", K(ret));
} else {
task_version_ = serialize_field.task_version_;
parallelism_ = serialize_field.parallelism_;
data_format_version_ = serialize_field.data_format_version_;
is_abort_ = serialize_field.is_abort_;
}
return ret;
}
int64_t ObDDLTask::get_serialize_param_size() const
{
ObDDLTaskSerializeField serialize_field(task_version_, parallelism_, data_format_version_, is_abort_);
return serialize_field.get_serialize_size();
}
int ObDDLTask::convert_to_record(
ObDDLTaskRecord &task_record,
common::ObIAllocator &allocator)
@ -829,8 +926,8 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
ObDDLTaskStatus real_new_status = new_status;
const ObDDLTaskStatus old_status = task_status_;
const bool error_need_retry = OB_SUCCESS != ret_code && is_error_need_retry(ret_code);
if (OB_TMP_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(trace_id_, is_cancel))) {
LOG_WARN("check task is canceled", K(tmp_ret), K(trace_id_));
if (OB_TMP_FAIL(check_ddl_task_is_cancel(trace_id_, is_cancel))) {
LOG_WARN("check ddl task is cancel failed", K(tmp_ret), K_(trace_id));
}
if (is_cancel) {
real_ret_code = (OB_SUCCESS == ret_code || error_need_retry) ? OB_CANCELED : ret_code;
@ -1039,6 +1136,20 @@ int ObDDLTask::report_error_code(const ObString &forward_user_message, const int
return ret;
}
int ObDDLTask::check_ddl_task_is_cancel(const TraceId &trace_id, bool &is_cancel)
{
int ret = OB_SUCCESS;
if (get_is_abort()) {
is_cancel = true;
} else if (trace_id.is_invalid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("trace id is invalid", K(ret), K(trace_id));
} else if (OB_FAIL(SYS_TASK_STATUS_MGR.is_task_cancel(trace_id, is_cancel))) {
LOG_WARN("failed to check task is cancel", K(ret), K(trace_id));
}
return ret;
}
// wait trans end, but not hold snapshot.
int ObDDLTask::wait_trans_end(
ObDDLWaitTransEndCtx &wait_trans_ctx,

View File

@ -46,6 +46,23 @@ public:
int64_t schema_version_;
};
struct ObDDLTaskID final
{
public:
ObDDLTaskID();
ObDDLTaskID(const uint64_t tenant_id, const int64_t task_id);
~ObDDLTaskID() = default;
uint64_t hash() const;
bool operator==(const ObDDLTaskID &other) const;
bool operator!=(const ObDDLTaskID &other) const;
bool is_valid() const { return OB_INVALID_TENANT_ID != tenant_id_ && task_id_ > 0; }
int assign(const ObDDLTaskID &other);
TO_STRING_KV(K_(tenant_id), K_(task_id));
public:
uint64_t tenant_id_;
int64_t task_id_;
};
struct ObDDLTaskRecord final
{
public:
@ -87,6 +104,22 @@ public:
int64_t row_inserted_;
};
struct ObDDLTaskSerializeField final
{
OB_UNIS_VERSION(1);
public:
TO_STRING_KV(K_(task_version), K_(parallelism), K_(data_format_version), K_(is_abort));
ObDDLTaskSerializeField() : task_version_(0), parallelism_(0), data_format_version_(0), is_abort_(false) {}
ObDDLTaskSerializeField(const int64_t task_version, const int64_t parallelism,
const int64_t data_format_version, const bool is_abort);
~ObDDLTaskSerializeField() = default;
void reset();
public:
int64_t task_version_;
int64_t parallelism_;
int64_t data_format_version_;
bool is_abort_;
};
struct ObCreateDDLTaskParam final
{
public:
@ -384,7 +417,7 @@ class ObDDLTask : public common::ObDLinkBase<ObDDLTask>
{
public:
explicit ObDDLTask(const share::ObDDLType task_type)
: lock_(), ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false),
: lock_(), ddl_tracing_(this), is_inited_(false), need_retry_(true), is_running_(false), is_abort_(false),
task_type_(task_type), trace_id_(), tenant_id_(0), object_id_(0), schema_version_(0),
target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0),
parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0),
@ -402,6 +435,8 @@ public:
share::ObDDLType get_task_type() const { return task_type_; }
void set_not_running() { ATOMIC_SET(&is_running_, false); }
void set_task_status(const share::ObDDLTaskStatus new_status) {task_status_ = new_status; }
void set_is_abort(const bool is_abort) { is_abort_ = is_abort; }
bool get_is_abort() { return is_abort_; }
bool try_set_running() { return !ATOMIC_CAS(&is_running_, false, true); }
uint64_t get_tenant_id() const { return tenant_id_; }
uint64_t get_object_id() const { return object_id_; }
@ -412,6 +447,7 @@ public:
int get_ddl_type_str(const int64_t ddl_type, const char *&ddl_type_str);
int64_t get_ret_code() const { return ret_code_; }
int64_t get_task_id() const { return task_id_; }
ObDDLTaskID get_ddl_task_id() const { return ObDDLTaskID(tenant_id_, task_id_); }
ObDDLTaskKey get_task_key() const { return ObDDLTaskKey(target_object_id_, schema_version_); }
int64_t get_parent_task_id() const { return parent_task_id_; }
int64_t get_task_version() const { return task_version_; }
@ -423,9 +459,9 @@ public:
share::ObDDLLongopsStat *get_longops_stat() const { return longops_stat_; }
int64_t get_data_format_version() const { return data_format_version_; }
static int fetch_new_task_id(ObMySQLProxy &sql_proxy, int64_t &new_task_id);
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const = 0;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos) = 0;
virtual int64_t get_serialize_param_size() const = 0;
virtual int serialize_params_to_message(char *buf, const int64_t buf_size, int64_t &pos) const;
virtual int deserlize_params_from_message(const uint64_t tenant_id, const char *buf, const int64_t buf_size, int64_t &pos);
virtual int64_t get_serialize_param_size() const;
const ObString &get_ddl_stmt_str() const { return ddl_stmt_str_; }
int set_ddl_stmt_str(const ObString &ddl_stmt_str);
int convert_to_record(ObDDLTaskRecord &task_record, common::ObIAllocator &allocator);
@ -434,6 +470,7 @@ public:
int refresh_schema_version();
int remove_task_record();
int report_error_code(const ObString &forward_user_message, const int64_t affected_rows = 0);
int check_ddl_task_is_cancel(const TraceId &trace_id, bool &is_cancel);
int wait_trans_end(
ObDDLWaitTransEndCtx &wait_trans_ctx,
const share::ObDDLTaskStatus next_task_status);
@ -507,6 +544,7 @@ protected:
bool is_inited_;
bool need_retry_;
bool is_running_;
bool is_abort_;
share::ObDDLType task_type_;
TraceId trace_id_;
uint64_t tenant_id_;

View File

@ -289,7 +289,8 @@ int ObDropIndexTask::cleanup_impl()
}
if (OB_SUCC(ret) && parent_task_id_ > 0) {
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_);
const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_);
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_);
}
LOG_INFO("clean task finished", K(ret), K(*this));
return ret;
@ -412,6 +413,8 @@ int ObDropIndexTask::serialize_params_to_message(char *buf, const int64_t buf_si
if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), KP(buf), K(buf_size));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_size, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(drop_index_arg_.serialize(buf, buf_size, pos))) {
LOG_WARN("serialize failed", K(ret));
}
@ -424,7 +427,9 @@ int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, con
obrpc::ObDropIndexArg tmp_drop_index_arg;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || buf_size <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(buf_size));
LOG_WARN("invalid arg", K(ret), K(tenant_id), KP(buf), K(buf_size));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, buf_size, pos))) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_drop_index_arg.deserialize(buf, buf_size, pos))) {
LOG_WARN("deserialize failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_drop_index_arg))) {
@ -437,7 +442,7 @@ int ObDropIndexTask::deserlize_params_from_message(const uint64_t tenant_id, con
int64_t ObDropIndexTask::get_serialize_param_size() const
{
return drop_index_arg_.get_serialize_size();
return drop_index_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size();
}
void ObDropIndexTask::flt_set_task_span_tag() const

View File

@ -1356,7 +1356,8 @@ int ObIndexBuildTask::cleanup_impl()
}
if (OB_SUCC(ret) && parent_task_id_ > 0) {
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id_, get_task_key(), ret_code_, trace_id_);
const ObDDLTaskID parent_task_id(tenant_id_, parent_task_id_);
root_service_->get_ddl_task_scheduler().on_ddl_task_finish(parent_task_id, get_task_key(), ret_code_, trace_id_);
}
LOG_INFO("clean task finished", K(ret), K(*this));
return ret;
@ -1470,13 +1471,12 @@ int ObIndexBuildTask::serialize_params_to_message(char *buf, const int64_t buf_l
if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(create_index_arg_.serialize(buf, buf_len, pos))) {
LOG_WARN("serialize create index arg failed", K(ret));
} else {
LST_DO_CODE(OB_UNIS_ENCODE, check_unique_snapshot_);
LST_DO_CODE(OB_UNIS_ENCODE, parallelism_, data_format_version_);
}
return ret;
}
@ -1488,8 +1488,8 @@ int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, co
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos)) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
@ -1498,7 +1498,6 @@ int ObIndexBuildTask::deserlize_params_from_message(const uint64_t tenant_id, co
LOG_WARN("deep copy create index arg failed", K(ret));
} else {
LST_DO_CODE(OB_UNIS_DECODE, check_unique_snapshot_);
LST_DO_CODE(OB_UNIS_DECODE, parallelism_, data_format_version_);
}
return ret;
}
@ -1507,7 +1506,5 @@ int64_t ObIndexBuildTask::get_serialize_param_size() const
{
return create_index_arg_.get_serialize_size()
+ serialization::encoded_length_i64(check_unique_snapshot_)
+ serialization::encoded_length_i64(task_version_)
+ serialization::encoded_length_i64(parallelism_)
+ serialization::encoded_length_i64(data_format_version_);
+ ObDDLTask::get_serialize_param_size();
}

View File

@ -47,6 +47,7 @@ public:
int set_nls_format(const ObString &nls_date_format,
const ObString &nls_timestamp_format,
const ObString &nls_timestamp_tz_format);
ObDDLTaskID get_ddl_task_id() { return ObDDLTaskID(tenant_id_, task_id_); }
virtual int process() override;
virtual int64_t get_deep_copy_size() const override { return sizeof(*this); }
virtual ObAsyncTask *deep_copy(char *buf, const int64_t buf_size) const override;

View File

@ -591,8 +591,8 @@ int ObModifyAutoincTask::serialize_params_to_message(char *buf, const int64_t bu
if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) {
LOG_WARN("fail to serialize ObDDLTask", K(ret));
} else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) {
LOG_WARN("serialize table arg failed", K(ret));
}
@ -606,8 +606,8 @@ int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id,
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
@ -620,7 +620,7 @@ int ObModifyAutoincTask::deserlize_params_from_message(const uint64_t tenant_id,
int64_t ObModifyAutoincTask::get_serialize_param_size() const
{
return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_);
return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size();
}
void ObModifyAutoincTask::flt_set_task_span_tag() const

View File

@ -928,8 +928,7 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const
int8_t copy_foreign_keys = static_cast<int8_t>(is_copy_foreign_keys_);
int8_t ignore_errors = static_cast<int8_t>(is_ignore_errors_);
int8_t do_finish = static_cast<int8_t>(is_do_finish_);
return alter_table_arg_.get_serialize_size() + serialization::encoded_length_i64(task_version_)
+ serialization::encoded_length_i64(parallelism_) + serialization::encoded_length_i64(data_format_version_)
return alter_table_arg_.get_serialize_size() + ObDDLTask::get_serialize_param_size()
+ serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers)
+ serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys)
+ serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish);
@ -947,14 +946,10 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_
if (OB_UNLIKELY(nullptr == buf || buf_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(buf), K(buf_len));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, task_version_))) {
LOG_WARN("fail to serialize task version", K(ret), K(task_version_));
} else if (OB_FAIL(ObDDLTask::serialize_params_to_message(buf, buf_len, pos))) {
LOG_WARN("ObDDLTask serialize failed", K(ret));
} else if (OB_FAIL(alter_table_arg_.serialize(buf, buf_len, pos))) {
LOG_WARN("serialize table arg failed", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, parallelism_))) {
LOG_WARN("fail to serialize parallelism_", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, data_format_version_))) {
LOG_WARN("fail to serialize parallelism_", K(ret));
} else if (OB_FAIL(serialization::encode_i8(buf, buf_len, pos, copy_indexes))) {
LOG_WARN("fail to serialize is_copy_indexes", K(ret));
} else if (OB_FAIL(serialization::encode_i8(buf, buf_len, pos, copy_triggers))) {
@ -986,18 +981,14 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || nullptr == buf || data_len <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(tenant_id), KP(buf), K(data_len));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &task_version_))) {
LOG_WARN("fail to deserialize task version", K(ret));
} else if (OB_FAIL(ObDDLTask::deserlize_params_from_message(tenant_id, buf, data_len, pos))) {
LOG_WARN("ObDDLTask deserlize failed", K(ret));
} else if (OB_FAIL(tmp_arg.deserialize(buf, data_len, pos))) {
LOG_WARN("serialize table failed", K(ret));
} else if (OB_FAIL(ObDDLUtil::replace_user_tenant_id(tenant_id, tmp_arg))) {
LOG_WARN("replace user tenant id failed", K(ret), K(tenant_id), K(tmp_arg));
} else if (OB_FAIL(deep_copy_table_arg(allocator_, tmp_arg, alter_table_arg_))) {
LOG_WARN("deep copy table arg failed", K(ret));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &parallelism_))) {
LOG_WARN("fail to deserialize parallelism", K(ret));
} else if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &data_format_version_))) {
LOG_WARN("fail to deserialize data format version", K(ret));
} else if (pos < data_len) {
if (OB_FAIL(serialization::decode_i8(buf, data_len, pos, &copy_indexes))) {
LOG_WARN("fail to deserialize is_copy_indexes_", K(ret));

View File

@ -3910,8 +3910,8 @@ int ObRootService::update_ddl_task_active_time(const obrpc::ObUpdateDDLTaskActiv
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
} else if (OB_FAIL(ddl_scheduler_.update_ddl_task_active_time(task_id))) {
LOG_WARN("fail to set RegTaskTime map", K(ret), K(task_id));
} else if (OB_FAIL(ddl_scheduler_.update_ddl_task_active_time(ObDDLTaskID(tenant_id, task_id)))) {
LOG_WARN("fail to set RegTaskTime map", K(ret), K(tenant_id), K(task_id));
}
return ret;
}
@ -3934,8 +3934,8 @@ int ObRootService::abort_redef_table(const obrpc::ObAbortRedefTableArg &arg)
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
} else if (OB_FAIL(ddl_scheduler_.abort_redef_table(task_id))) {
LOG_WARN("cancel task failed", K(ret), K(task_id));
} else if (OB_FAIL(ddl_scheduler_.abort_redef_table(ObDDLTaskID(tenant_id, task_id)))) {
LOG_WARN("cancel task failed", K(ret), K(tenant_id), K(task_id));
}
return ret;
}
@ -3958,7 +3958,7 @@ int ObRootService::finish_redef_table(const obrpc::ObFinishRedefTableArg &arg)
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
} else if (OB_FAIL(ddl_scheduler_.finish_redef_table(task_id, tenant_id))) {
} else if (OB_FAIL(ddl_scheduler_.finish_redef_table(ObDDLTaskID(tenant_id, task_id)))) {
LOG_WARN("failed to finish redef table", K(ret), K(task_id), K(tenant_id));
}
return ret;
@ -3987,8 +3987,7 @@ int ObRootService::copy_table_dependents(const obrpc::ObCopyTableDependentsArg &
} else if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(arg));
} else if (OB_FAIL(ddl_scheduler_.copy_table_dependents(task_id,
tenant_id,
} else if (OB_FAIL(ddl_scheduler_.copy_table_dependents(ObDDLTaskID(tenant_id, task_id),
is_copy_constraints,
is_copy_indexes,
is_copy_triggers,