From a780f3b5d530d04dddeb27465fc678b6ae3c1f55 Mon Sep 17 00:00:00 2001 From: Charles0429 Date: Sat, 10 Feb 2024 07:21:58 +0000 Subject: [PATCH] fix direct load task lost ret code when switch rootservice leader --- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 10 ++----- src/rootserver/ddl_task/ob_ddl_scheduler.h | 5 ++-- src/rootserver/ddl_task/ob_ddl_task.cpp | 28 +++++++++++++++++++ src/rootserver/ddl_task/ob_ddl_task.h | 7 +++++ .../ddl_task/ob_table_redefinition_task.cpp | 10 ++++++- 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index db51d57bb..2a8956cde 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -513,9 +513,10 @@ int ObRedefCallback::modify_info(ObTableRedefinitionTask &redef_task, } else { message.reset(); message.assign(buf, serialize_param_size); - if (OB_FAIL(ObDDLTaskRecordOperator::update_message(trans, + if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code_and_message(trans, redef_task.get_tenant_id(), redef_task.get_task_id(), + redef_task.get_ret_code(), message))) { LOG_WARN("update task message failed", K(ret), K(redef_task.get_tenant_id()), K(redef_task.get_task_id()), K(message)); @@ -652,13 +653,6 @@ int ObFinishRedefCallback::update_task_info_in_queue(ObTableRedefinitionTask& re return ddl_task_queue.update_task_process_schedulable(ObDDLTaskID(redef_task.get_tenant_id(),redef_task.get_task_id())); } -int ObUpdateSSTableCompleteStatusCallback::set_ret_code(const int ret_code) -{ - int ret = OB_SUCCESS; - ret_code_ = ret_code; - return ret; -} - int ObUpdateSSTableCompleteStatusCallback::update_redef_task_info(ObTableRedefinitionTask &redef_task) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index ed545e4e6..8fdc130ec 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -210,13 +210,14 @@ class ObUpdateSSTableCompleteStatusCallback : public ObRedefCallback { public: ObUpdateSSTableCompleteStatusCallback() - : ret_code_(OB_SUCCESS) + : ret_code_(common::OB_SUCCESS) {} ~ObUpdateSSTableCompleteStatusCallback() = default; + void set_ret_code (const int ret_code) { ret_code_ = ret_code; } + int get_ret_code() const { return ret_code_; } virtual int update_redef_task_info(ObTableRedefinitionTask& redef_task) override; virtual int update_task_info_in_queue(ObTableRedefinitionTask& redef_task, ObDDLTaskQueue &ddl_task_queue) override; - int set_ret_code(const int ret_code); private: int ret_code_; }; diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index b727500d8..af6a4a82c 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -2820,6 +2820,34 @@ int ObDDLTaskRecordOperator::update_status_and_message( return ret; } +int ObDDLTaskRecordOperator::update_ret_code_and_message( + common::ObISQLClient &proxy, + const uint64_t tenant_id, + const int64_t task_id, + const int ret_code, + ObString &message) +{ + int ret = OB_SUCCESS; + ObSqlString sql_string; + ObSqlString message_string; + int64_t affected_rows = 0; + if (OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(task_id), K(tenant_id)); + } else if (OB_FAIL(to_hex_str(message, message_string))) { + LOG_WARN("append hex escaped string failed", K(ret)); + } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET ret_code = %d, message = \"%.*s\" WHERE task_id = %lu", + OB_ALL_DDL_TASK_STATUS_TNAME, ret_code, static_cast(message_string.length()), message_string.ptr(), task_id))) { + LOG_WARN("assign sql string failed", K(ret), K(ret_code), K(task_id)); + } else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) { + LOG_WARN("update status of ddl task record failed", K(ret), K(sql_string)); + } else if (OB_UNLIKELY(affected_rows < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows)); + } + return ret; +} + int ObDDLTaskRecordOperator::delete_record(common::ObMySQLProxy &proxy, const uint64_t tenant_id, const int64_t task_id) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 40f42f4f8..201fbe9b7 100755 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -210,6 +210,13 @@ public: const int64_t task_status, ObString &message); + static int update_ret_code_and_message( + common::ObISQLClient &proxy, + const uint64_t tenant_id, + const int64_t task_id, + const int ret_code, + ObString &message); + static int delete_record( common::ObMySQLProxy &proxy, const uint64_t tenant_id, diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index a6acc6745..1182989fc 100755 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -1063,7 +1063,8 @@ int64_t ObTableRedefinitionTask::get_serialize_param_size() const + serialization::encoded_length_i8(copy_indexes) + serialization::encoded_length_i8(copy_triggers) + serialization::encoded_length_i8(copy_constraints) + serialization::encoded_length_i8(copy_foreign_keys) + serialization::encoded_length_i8(ignore_errors) + serialization::encoded_length_i8(do_finish) - + serialization::encoded_length_i64(target_cg_cnt_); + + serialization::encoded_length_i64(target_cg_cnt_) + + serialization::encoded_length_i64(complete_sstable_job_ret_code_); } int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_t buf_len, int64_t &pos) const @@ -1096,6 +1097,8 @@ int ObTableRedefinitionTask::serialize_params_to_message(char *buf, const int64_ LOG_WARN("fail to serialize is_do_finish", K(ret)); } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, target_cg_cnt_))) { LOG_WARN("fail to serialize target_cg_cnt", K(ret)); + } else if (OB_FAIL(serialization::encode_i64(buf, buf_len, pos, complete_sstable_job_ret_code_))) { + LOG_WARN("fail to serialize complete sstable job ret code", K(ret)); } FLOG_INFO("serialize message for table redefinition", K(ret), K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this)); @@ -1147,6 +1150,11 @@ int ObTableRedefinitionTask::deserlize_params_from_message(const uint64_t tenant is_ignore_errors_ = static_cast(ignore_errors); is_do_finish_ = static_cast(do_finish); } + if (OB_SUCC(ret) && pos < data_len) { + if (OB_FAIL(serialization::decode_i64(buf, data_len, pos, &complete_sstable_job_ret_code_))) { + LOG_WARN("fail to deserialize is_do_finish_", K(ret)); + } + } } FLOG_INFO("deserialize message for table redefinition", K(ret), K(copy_indexes), K(copy_triggers), K(copy_constraints), K(copy_foreign_keys), K(ignore_errors), K(do_finish), K(*this));