fix direct load ret code loss

This commit is contained in:
Charles0429
2023-09-21 04:40:17 +00:00
committed by ob-robot
parent d23c31fd93
commit 7317c0902f
4 changed files with 120 additions and 2 deletions

View File

@ -237,6 +237,30 @@ int ObDDLTaskQueue::modify_task(const ObDDLTaskID &task_id, F &&op)
return ret; return ret;
} }
template<typename F>
int ObDDLTaskQueue::get_task(const ObDDLTaskKey &task_key, F &&op)
{
int ret = OB_SUCCESS;
common::ObSpinLockGuard guard(lock_);
ObDDLTask *task = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = common::OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(!task_key.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), K(task_key));
} else if (OB_FAIL(task_map_.get_refactored(task_key, task))) {
ret = OB_HASH_NOT_EXIST == ret ? OB_ENTRY_NOT_EXIST : ret;
LOG_WARN("get from task map failed", K(ret), K(task_key));
} else if (OB_ISNULL(task)) {
ret = OB_ERR_SYS;
LOG_WARN("invalid task", K(ret), K(task_key));
} else if (OB_FAIL(op(*task))) {
LOG_WARN("failed to do task callback", K(ret));
}
return ret;
}
int ObDDLTaskQueue::update_task_copy_deps_setting(const ObDDLTaskID &task_id, int ObDDLTaskQueue::update_task_copy_deps_setting(const ObDDLTaskID &task_id,
const bool is_copy_constraints, const bool is_copy_constraints,
const bool is_copy_indexes, const bool is_copy_indexes,
@ -294,6 +318,35 @@ int ObDDLTaskQueue::update_task_process_schedulable(const ObDDLTaskID &task_id)
return ret; return ret;
} }
int ObDDLTaskQueue::update_task_ret_code(const ObDDLTaskID &task_id, const int ret_code)
{
int ret = OB_SUCCESS;
ObDDLTask *ddl_task = nullptr;
ObTableRedefinitionTask *table_redefinition_task = nullptr;
common::ObSpinLockGuard guard(lock_);
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
} else if (OB_UNLIKELY(!task_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arg", K(ret), K(task_id));
} else if (OB_FAIL(task_id_map_.get_refactored(task_id, ddl_task))) {
ret = OB_HASH_NOT_EXIST == ret ? OB_ENTRY_NOT_EXIST : ret;
LOG_WARN("get from task map failed", K(ret), K(task_id));
} else if (OB_ISNULL(table_redefinition_task = static_cast<ObTableRedefinitionTask*>(ddl_task))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl_task is null", K(ret));
} else {
const ObTabletID unused_tablet_id;
const int64_t unused_snapshot_version = 0;
const int64_t unused_execution_id = 0;
const ObDDLTaskInfo unused_task_info;
ret = table_redefinition_task->update_complete_sstable_job_status(unused_tablet_id, unused_snapshot_version,
unused_execution_id, ret_code, unused_task_info);
}
return ret;
}
int ObDDLTaskQueue::abort_task(const ObDDLTaskID &task_id) int ObDDLTaskQueue::abort_task(const ObDDLTaskID &task_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -580,6 +633,35 @@ int ObFinishRedefCallback::update_task_info_in_queue(ObTableRedefinitionTask& re
return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id())); return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id()));
} }
int ObUpdateSSTableCompleteStatusCallback::set_ret_code(const int ret_code)
{
int ret = OB_SUCCESS;
ret_code_ = ret_code;
return ret;
}
int ObUpdateSSTableCompleteStatusCallback::update_redef_task_info(ObTableRedefinitionTask &redef_task)
{
int ret = OB_SUCCESS;
const ObTabletID unused_tablet_id;
const int64_t unused_snapshot_version = 0;
const int64_t unused_execution_id = 0;
const ObDDLTaskInfo unused_task_info;
ret = redef_task.update_complete_sstable_job_status(unused_tablet_id, unused_snapshot_version,
unused_execution_id, ret_code_, unused_task_info);
return ret;
}
int ObUpdateSSTableCompleteStatusCallback::update_task_info_in_queue(ObTableRedefinitionTask &redef_task,
ObDDLTaskQueue &ddl_task_queue)
{
int ret = OB_SUCCESS;
if (OB_FAIL(ddl_task_queue.update_task_ret_code(ObDDLTaskID(redef_task.get_tenant_id(), redef_task.get_task_id()), ret_code_))) {
LOG_WARN("update_task_copy_deps_setting failed", K(ret));
}
return ret;
}
int ObPrepareAlterTableArgParam::init(const uint64_t session_id, int ObPrepareAlterTableArgParam::init(const uint64_t session_id,
const ObSQLMode &sql_mode, const ObSQLMode &sql_mode,
const ObString &ddl_stmt_str, const ObString &ddl_stmt_str,
@ -2275,12 +2357,26 @@ int ObDDLScheduler::on_sstable_complement_job_reply(
const ObDDLTaskInfo &addition_info) const ObDDLTaskInfo &addition_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObDDLType ddl_type = DDL_INVALID;
ObDDLTaskID task_id;
if (OB_UNLIKELY(!is_inited_)) { if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret)); LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(!(task_key.is_valid() && snapshot_version > 0 && execution_id >= 0))) { } else if (OB_UNLIKELY(!(task_key.is_valid() && snapshot_version > 0 && execution_id >= 0))) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(task_key), K(snapshot_version), K(execution_id), K(ret_code)); LOG_WARN("invalid argument", K(ret), K(task_key), K(snapshot_version), K(execution_id), K(ret_code));
} else if (OB_FAIL(task_queue_.get_task(task_key, [&ddl_type, &task_id](ObDDLTask &task) -> int {
ddl_type = task.get_task_type();
task_id = task.get_ddl_task_id();
return OB_SUCCESS;
}))) {
LOG_WARN("get task failed", K(ret), K(task_key));
} else if (is_direct_load_task(ddl_type)) {
ObUpdateSSTableCompleteStatusCallback callback;
callback.set_ret_code(ret_code);
if (OB_FAIL(modify_redef_task(task_id, callback))) {
LOG_WARN("fail to modify redef task", K(ret), K(task_id));
}
} else if (OB_FAIL(task_queue_.modify_task(task_key, [&tablet_id, &snapshot_version, &execution_id, &ret_code, &addition_info](ObDDLTask &task) -> int { } else if (OB_FAIL(task_queue_.modify_task(task_key, [&tablet_id, &snapshot_version, &execution_id, &ret_code, &addition_info](ObDDLTask &task) -> int {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const int64_t task_type = task.get_task_type(); const int64_t task_type = task.get_task_type();
@ -2301,8 +2397,6 @@ int ObDDLScheduler::on_sstable_complement_job_reply(
case ObDDLType::DDL_MODIFY_COLUMN: case ObDDLType::DDL_MODIFY_COLUMN:
case ObDDLType::DDL_CONVERT_TO_CHARACTER: case ObDDLType::DDL_CONVERT_TO_CHARACTER:
case ObDDLType::DDL_TABLE_REDEFINITION: case ObDDLType::DDL_TABLE_REDEFINITION:
case ObDDLType::DDL_DIRECT_LOAD:
case ObDDLType::DDL_DIRECT_LOAD_INSERT:
if (OB_FAIL(static_cast<ObTableRedefinitionTask *>(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) { if (OB_FAIL(static_cast<ObTableRedefinitionTask *>(&task)->update_complete_sstable_job_status(tablet_id, snapshot_version, execution_id, ret_code, addition_info))) {
LOG_WARN("update complete sstable job status", K(ret)); LOG_WARN("update complete sstable job status", K(ret));
} }

View File

@ -65,6 +65,8 @@ public:
int modify_task(const ObDDLTaskKey &task_key, F &&op); int modify_task(const ObDDLTaskKey &task_key, F &&op);
template<typename F> template<typename F>
int modify_task(const ObDDLTaskID &task_id, F &&op); int modify_task(const ObDDLTaskID &task_id, F &&op);
template<typename F>
int get_task(const ObDDLTaskKey &task_key, F &&op);
int update_task_copy_deps_setting(const ObDDLTaskID &task_id, int update_task_copy_deps_setting(const ObDDLTaskID &task_id,
const bool is_copy_constraints, const bool is_copy_constraints,
const bool is_copy_indexes, const bool is_copy_indexes,
@ -72,6 +74,7 @@ public:
const bool is_copy_foreign_keys, const bool is_copy_foreign_keys,
const bool is_ignore_errors); const bool is_ignore_errors);
int update_task_process_schedulable(const ObDDLTaskID &task_id); int update_task_process_schedulable(const ObDDLTaskID &task_id);
int update_task_ret_code(const ObDDLTaskID &task_id, const int ret_code);
int abort_task(const ObDDLTaskID &task_id); int abort_task(const ObDDLTaskID &task_id);
int64_t get_task_cnt() const { return task_list_.get_size(); } int64_t get_task_cnt() const { return task_list_.get_size(); }
void destroy(); void destroy();
@ -200,6 +203,21 @@ public:
ObDDLTaskQueue &ddl_task_queue) override; ObDDLTaskQueue &ddl_task_queue) override;
}; };
class ObUpdateSSTableCompleteStatusCallback : public ObRedefCallback
{
public:
ObUpdateSSTableCompleteStatusCallback()
: ret_code_(OB_SUCCESS)
{}
~ObUpdateSSTableCompleteStatusCallback() = default;
virtual int update_redef_task_info(ObTableRedefinitionTask& redef_task) override;
virtual int update_task_info_in_queue(ObTableRedefinitionTask& redef_task,
ObDDLTaskQueue &ddl_task_queue) override;
int set_ret_code(const int ret_code);
private:
int ret_code_;
};
/* /*
* the only scheduler for all ddl tasks executed in root service * the only scheduler for all ddl tasks executed in root service
* *

View File

@ -189,6 +189,7 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob
case ObDDLType::DDL_DIRECT_LOAD: case ObDDLType::DDL_DIRECT_LOAD:
case ObDDLType::DDL_DIRECT_LOAD_INSERT: { case ObDDLType::DDL_DIRECT_LOAD_INSERT: {
complete_sstable_job_ret_code_ = ret_code; complete_sstable_job_ret_code_ = ret_code;
ret_code_ = ret_code;
LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_)); LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_));
break; break;
} }

View File

@ -220,6 +220,11 @@ static inline bool is_long_running_ddl(const ObDDLType type)
return is_simple_table_long_running_ddl(type) || is_double_table_long_running_ddl(type); return is_simple_table_long_running_ddl(type) || is_double_table_long_running_ddl(type);
} }
static inline bool is_direct_load_task(const ObDDLType type)
{
return DDL_DIRECT_LOAD == type || DDL_DIRECT_LOAD_INSERT == type;
}
static inline bool is_invalid_ddl_type(const ObDDLType type) static inline bool is_invalid_ddl_type(const ObDDLType type)
{ {
return DDL_INVALID == type; return DDL_INVALID == type;