fix direct load task lost ret code when switch rootservice leader

This commit is contained in:
Charles0429
2024-02-03 02:47:06 +00:00
committed by ob-robot
parent 086c79e2e8
commit fcafbd58d7
5 changed files with 49 additions and 11 deletions

View File

@ -513,9 +513,10 @@ int ObRedefCallback::modify_info(ObTableRedefinitionTask &redef_task,
} else { } else {
message.reset(); message.reset();
message.assign(buf, serialize_param_size); message.assign(buf, serialize_param_size);
if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code_and_message(trans,
redef_task.get_tenant_id(), redef_task.get_tenant_id(),
redef_task.get_task_id(), redef_task.get_task_id(),
redef_task.get_ret_code(),
message))) { message))) {
LOG_WARN("update task message failed", K(ret), K(redef_task.get_tenant_id()), LOG_WARN("update task message failed", K(ret), K(redef_task.get_tenant_id()),
K(redef_task.get_task_id()), K(message)); K(redef_task.get_task_id()), K(message));
@ -652,13 +653,6 @@ int ObFinishRedefCallback::update_task_info_in_queue(ObTableRedefinitionTask& re
return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id())); return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id()));
} }
int ObUpdateSSTableCompleteStatusCallback::set_ret_code(const int ret_code)
{
int ret = OB_SUCCESS;
ret_code_ = ret_code;
return ret;
}
int ObUpdateSSTableCompleteStatusCallback::update_redef_task_info(ObTableRedefinitionTask &redef_task) int ObUpdateSSTableCompleteStatusCallback::update_redef_task_info(ObTableRedefinitionTask &redef_task)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -210,13 +210,14 @@ class ObUpdateSSTableCompleteStatusCallback : public ObRedefCallback
{ {
public: public:
ObUpdateSSTableCompleteStatusCallback() ObUpdateSSTableCompleteStatusCallback()
: ret_code_(OB_SUCCESS) : ret_code_(common::OB_SUCCESS)
{} {}
~ObUpdateSSTableCompleteStatusCallback() = default; ~ObUpdateSSTableCompleteStatusCallback() = default;
void set_ret_code (const int ret_code) { ret_code_ = ret_code; }
int get_ret_code() const { return ret_code_; }
virtual int update_redef_task_info(ObTableRedefinitionTask& redef_task) override; virtual int update_redef_task_info(ObTableRedefinitionTask& redef_task) override;
virtual int update_task_info_in_queue(ObTableRedefinitionTask& redef_task, virtual int update_task_info_in_queue(ObTableRedefinitionTask& redef_task,
ObDDLTaskQueue &ddl_task_queue) override; ObDDLTaskQueue &ddl_task_queue) override;
int set_ret_code(const int ret_code);
private: private:
int ret_code_; int ret_code_;
}; };

View File

@ -2820,6 +2820,34 @@ int ObDDLTaskRecordOperator::update_status_and_message(
return ret; return ret;
} }
int ObDDLTaskRecordOperator::update_ret_code_and_message(
common::ObISQLClient &proxy,
const uint64_t tenant_id,
const int64_t task_id,
const int ret_code,
ObString &message)
{
int ret = OB_SUCCESS;
ObSqlString sql_string;
ObSqlString message_string;
int64_t affected_rows = 0;
if (OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(task_id), K(tenant_id));
} else if (OB_FAIL(to_hex_str(message, message_string))) {
LOG_WARN("append hex escaped string failed", K(ret));
} else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET ret_code = %d, message = \"%.*s\" WHERE task_id = %lu",
OB_ALL_DDL_TASK_STATUS_TNAME, ret_code, static_cast<int>(message_string.length()), message_string.ptr(), task_id))) {
LOG_WARN("assign sql string failed", K(ret), K(ret_code), K(task_id));
} else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) {
LOG_WARN("update status of ddl task record failed", K(ret), K(sql_string));
} else if (OB_UNLIKELY(affected_rows < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows));
}
return ret;
}
int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id) int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -210,6 +210,13 @@ public:
const int64_t task_status, const int64_t task_status,
ObString &message); ObString &message);
static int update_ret_code_and_message(
common::ObISQLClient &proxy,
const uint64_t tenant_id,
const int64_t task_id,
const int ret_code,
ObString &message);
static int delete_record( static int delete_record(
common::ObMySQLProxy &proxy, common::ObMySQLProxy &proxy,
const uint64_t tenant_id, const uint64_t tenant_id,

View File

@ -1063,7 +1063,8 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const
+ serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers) + serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers)
+ serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys) + serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys)
+ serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish) + serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish)
+ serialization::encoded_length_i64(target_cg_cnt_); + serialization::encoded_length_i64(target_cg_cnt_)
+ serialization::encoded_length_i64(complete_sstable_job_ret_code_);
} }
int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const
@ -1096,6 +1097,8 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_
LOG_WARN("fail to serialize is_do_finish", K(ret)); LOG_WARN("fail to serialize is_do_finish", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, target_cg_cnt_))) { } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, target_cg_cnt_))) {
LOG_WARN("fail to serialize target_cg_cnt", K(ret)); LOG_WARN("fail to serialize target_cg_cnt", K(ret));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, complete_sstable_job_ret_code_))) {
LOG_WARN("fail to serialize complete sstable job ret code", K(ret));
} }
FLOG_INFO("serialize message for table redefinition", K(ret), FLOG_INFO("serialize message for table redefinition", K(ret),
K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this)); K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));
@ -1147,6 +1150,11 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant
is_ignore_errors_ = static_cast<bool>(ignore_errors); is_ignore_errors_ = static_cast<bool>(ignore_errors);
is_do_finish_ = static_cast<bool>(do_finish); is_do_finish_ = static_cast<bool>(do_finish);
} }
if (OB_SUCC(ret) && pos < data_len) {
if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &complete_sstable_job_ret_code_))) {
LOG_WARN("fail to deserialize is_do_finish_", K(ret));
}
}
} }
FLOG_INFO("deserialize message for table redefinition", K(ret), FLOG_INFO("deserialize message for table redefinition", K(ret),
K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this)); K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));