fix direct load task lost ret code when switch rootservice leader

This commit is contained in:
Charles0429 2024-02-10 07:21:58 +00:00 committed by ob-robot
parent d0bd618d86
commit a780f3b5d5
5 changed files with 49 additions and 11 deletions

View File

@ -513,9 +513,10 @@ int ObRedefCallback::modify_info(ObTableRedefinitionTask &redef_task,
} else {
message.reset();
message.assign(buf, serialize_param_size);
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans,
if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code_and_message(trans,
redef_task.get_tenant_id(),
redef_task.get_task_id(),
redef_task.get_ret_code(),
message))) {
LOG_WARN("update task message failed", K(ret), K(redef_task.get_tenant_id()),
K(redef_task.get_task_id()), K(message));
@ -652,13 +653,6 @@ int ObFinishRedefCallback::update_task_info_in_queue(ObTableRedefinitionTask& re
return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id()));
}
int ObUpdateSSTableCompleteStatusCallback::set_ret_code(const int ret_code)
{
int ret = OB_SUCCESS;
ret_code_ = ret_code;
return ret;
}
int ObUpdateSSTableCompleteStatusCallback::update_redef_task_info(ObTableRedefinitionTask &redef_task)
{
int ret = OB_SUCCESS;

View File

@ -210,13 +210,14 @@ class ObUpdateSSTableCompleteStatusCallback : public ObRedefCallback
{
public:
ObUpdateSSTableCompleteStatusCallback()
: ret_code_(OB_SUCCESS)
: ret_code_(common::OB_SUCCESS)
{}
~ObUpdateSSTableCompleteStatusCallback() = default;
void set_ret_code (const int ret_code) { ret_code_ = ret_code; }
int get_ret_code() const { return ret_code_; }
virtual int update_redef_task_info(ObTableRedefinitionTask& redef_task) override;
virtual int update_task_info_in_queue(ObTableRedefinitionTask& redef_task,
ObDDLTaskQueue &ddl_task_queue) override;
int set_ret_code(const int ret_code);
private:
int ret_code_;
};

View File

@ -2820,6 +2820,34 @@ int ObDDLTaskRecordOperator::update_status_and_message(
return ret;
}
int ObDDLTaskRecordOperator::update_ret_code_and_message(
common::ObISQLClient &proxy,
const uint64_t tenant_id,
const int64_t task_id,
const int ret_code,
ObString &message)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
ObSqlString message_string;
int64_t affected_rows = 0;
if (OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(task_id), K(tenant_id));
} else if (OB_FAIL(to_hex_str(message, message_string))) {
LOG_WARN("append hex escaped string failed", K(ret));
} else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET ret_code = %d, message = \"%.*s\" WHERE task_id = %lu",
OB_ALL_DDL_TASK_STATUS_TNAME, ret_code, static_cast<int>(message_string.length()), message_string.ptr(), task_id))) {
LOG_WARN("assign sql string failed", K(ret), K(ret_code), K(task_id));
} else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) {
LOG_WARN("update status of ddl task record failed", K(ret), K(sql_string));
} else if (OB_UNLIKELY(affected_rows < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows));
}
return ret;
}
int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id)
{
int ret = OB_SUCCESS;

View File

@ -210,6 +210,13 @@ public:
const int64_t task_status,
ObString &message);
static int update_ret_code_and_message(
common::ObISQLClient &proxy,
const uint64_t tenant_id,
const int64_t task_id,
const int ret_code,
ObString &message);
static int delete_record(
common::ObMySQLProxy &proxy,
const uint64_t tenant_id,

View File

@ -1063,7 +1063,8 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const
+ serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers)
+ serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys)
+ serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish)
+ serialization::encoded_length_i64(target_cg_cnt_);
+ serialization::encoded_length_i64(target_cg_cnt_)
+ serialization::encoded_length_i64(complete_sstable_job_ret_code_);
}
int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const
@ -1096,6 +1097,8 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_
LOG_WARN("fail to serialize is_do_finish", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, target_cg_cnt_))) {
LOG_WARN("fail to serialize target_cg_cnt", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, complete_sstable_job_ret_code_))) {
LOG_WARN("fail to serialize complete sstable job ret code", K(ret));
}
FLOG_INFO("serialize message for table redefinition", K(ret),
K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));
@ -1147,6 +1150,11 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant
is_ignore_errors_ = static_cast<bool>(ignore_errors);
is_do_finish_ = static_cast<bool>(do_finish);
}
if (OB_SUCC(ret) && pos < data_len) {
if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &complete_sstable_job_ret_code_))) {
LOG_WARN("fail to deserialize is_do_finish_", K(ret));
}
}
}
FLOG_INFO("deserialize message for table redefinition", K(ret),
K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));