From 975f88770da603c2de33352a3b0580ebcdf364de Mon Sep 17 00:00:00 2001 From: YoungYang0820 Date: Thu, 17 Nov 2022 02:38:41 +0000 Subject: [PATCH] save ddl ret_code when switch_status --- .../ddl_task/ob_column_redefinition_task.cpp | 1 + .../ddl_task/ob_constraint_task.cpp | 1 + src/rootserver/ddl_task/ob_ddl_retry_task.cpp | 1 + src/rootserver/ddl_task/ob_ddl_task.cpp | 61 ++++++++++++++----- src/rootserver/ddl_task/ob_ddl_task.h | 15 +++-- .../ddl_task/ob_drop_index_task.cpp | 1 + .../ddl_task/ob_index_build_task.cpp | 1 + .../ddl_task/ob_modify_autoinc_task.cpp | 1 + .../ddl_task/ob_table_redefinition_task.cpp | 1 + 9 files changed, 64 insertions(+), 19 deletions(-) diff --git a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp index a8b85c1f99..dfcb0eb87f 100644 --- a/src/rootserver/ddl_task/ob_column_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_column_redefinition_task.cpp @@ -97,6 +97,7 @@ int ObColumnRedefinitionTask::init(const ObDDLTaskRecord &task_record) task_status_ = static_cast(task_record.task_status_); snapshot_version_ = task_record.snapshot_version_; tenant_id_ = task_record.tenant_id_; + ret_code_ = task_record.ret_code_; is_inited_ = true; } return ret; diff --git a/src/rootserver/ddl_task/ob_constraint_task.cpp b/src/rootserver/ddl_task/ob_constraint_task.cpp index 6f1fba2e00..24a15b29b8 100644 --- a/src/rootserver/ddl_task/ob_constraint_task.cpp +++ b/src/rootserver/ddl_task/ob_constraint_task.cpp @@ -553,6 +553,7 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record) task_id_ = task_record.task_id_; parent_task_id_ = task_record.parent_task_id_; is_table_hidden_ = table_schema->is_user_hidden_table(); + ret_code_ = task_record.ret_code_; is_inited_ = true; } return ret; diff --git a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp index 9c89c60e65..310f27d9b5 100644 --- a/src/rootserver/ddl_task/ob_ddl_retry_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_retry_task.cpp @@ -207,6 +207,7 @@ int ObDDLRetryTask::init(const ObDDLTaskRecord &task_record) task_type_ = task_record.ddl_type_; parent_task_id_ = task_record.parent_task_id_; task_version_ = task_record.task_version_; + ret_code_ = task_record.ret_code_; task_status_ = static_cast(task_record.task_status_); if (nullptr != task_record.message_) { int64_t pos = 0; diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index ade7094ae5..0b621a1315 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -259,6 +259,7 @@ int ObDDLTask::convert_to_record( task_record.task_id_ = get_task_id(); task_record.parent_task_id_ = get_parent_task_id(); task_record.task_version_ = get_task_version(); + task_record.ret_code_ = get_ret_code(); const ObString &ddl_stmt_str = get_ddl_stmt_str(); if (serialize_param_size > 0) { char *buf = nullptr; @@ -328,6 +329,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code) } else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status( trans, tenant_id_, task_id_, static_cast(real_new_status)))) { LOG_WARN("update task status failed", K(ret), K(task_id_), K(new_status)); + } else if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code(trans, tenant_id_, task_id_, ret_code_))) { + LOG_WARN("failed to update ret code", K(ret)); } bool commit = (OB_SUCCESS == ret); @@ -1378,7 +1381,8 @@ bool ObDDLTaskRecord::is_valid() const && tenant_id_ > 0 && task_version_ > 0 && OB_INVALID_ID != object_id_ - && schema_version_ > 0; + && schema_version_ > 0 + && ret_code_ >= 0; return is_valid; } @@ -1396,6 +1400,7 @@ void ObDDLTaskRecord::reset() snapshot_version_ = 0; message_.reset(); task_version_ = 0; + ret_code_ = OB_SUCCESS; } @@ -1448,26 +1453,51 @@ int ObDDLTaskRecordOperator::update_snapshot_version( return ret; } -int ObDDLTaskRecordOperator::update_message( - common::ObMySQLProxy &proxy, +int ObDDLTaskRecordOperator::update_ret_code( + common::ObISQLClient &sql_client, const uint64_t tenant_id, const int64_t task_id, - const char *message) + const int64_t ret_code) { int ret = OB_SUCCESS; ObSqlString sql_string; int64_t affected_rows = 0; - if (OB_UNLIKELY(!proxy.is_inited() || nullptr == message - || 0 == strlen(message) - || strlen(message) >= ObDDLTaskRecord::MAX_MESSAGE_LENGTH + if (OB_ISNULL(sql_client.get_pool()) || OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arg", K(ret), K(tenant_id), K(task_id)); + } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET ret_code=%lu WHERE task_id=%lu ", + OB_ALL_DDL_TASK_STATUS_TNAME, ret_code, task_id))) { + LOG_WARN("assign sql string failed", K(ret), K(ret_code), K(task_id)); + } else if (OB_FAIL(sql_client.write(tenant_id, sql_string.ptr(), affected_rows))) { + LOG_WARN("update snapshot_version of ddl task record failed", K(ret), K(sql_string)); + } else if (OB_UNLIKELY(affected_rows < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows)); + } + return ret; +} + +int ObDDLTaskRecordOperator::update_message( + common::ObISQLClient &proxy, + const uint64_t tenant_id, + const int64_t task_id, + const ObString &message) +{ + int ret = OB_SUCCESS; + ObSqlString sql_string; + ObSqlString message_string; + int64_t affected_rows = 0; + if (OB_UNLIKELY(message.empty() || tenant_id <= 0 || task_id <= 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()), K(tenant_id), K(task_id), K(message)); - } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET message='%s' WHERE task_id=%lu", - OB_ALL_DDL_TASK_STATUS_TNAME, message, task_id))) { - LOG_WARN("assign sql string failed", K(ret), K(message)); + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(message)); + } else if (OB_FAIL(to_hex_str(message, message_string))) { + LOG_WARN("append hex escaped string failed", K(ret)); + } else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET message=\"%.*s\" WHERE task_id=%lu", + OB_ALL_DDL_TASK_STATUS_TNAME, static_cast(message_string.length()), message_string.ptr(), task_id))) { + LOG_WARN("assign sql string failed", K(ret), K(message_string)); } else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) { - LOG_WARN("update message of ddl task record failed", K(ret), K(sql_string)); + LOG_WARN("update message of ddl task record failed", K(ret), K(sql_string), K(message_string)); } else if (OB_UNLIKELY(affected_rows < 0)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows)); @@ -1728,11 +1758,11 @@ int ObDDLTaskRecordOperator::insert_record( } else if (OB_FAIL(to_hex_str(record.message_, message_string))) { LOG_WARN("append hex escaped string failed", K(ret)); } else if (OB_FAIL(sql_string.assign_fmt( - " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ddl_stmt_str, message) " - " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, '%.*s', \"%.*s\") ", + " INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ret_code, ddl_stmt_str, message) " + " VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, '%.*s', \"%.*s\") ", OB_ALL_DDL_TASK_STATUS_TNAME, record.task_id_, record.parent_task_id_, ObSchemaUtils::get_extract_tenant_id(record.tenant_id_, record.tenant_id_), record.object_id_, record.schema_version_, - get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, + get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.ret_code_, static_cast(ddl_stmt_string.length()), ddl_stmt_string.ptr(), static_cast(message_string.length()), message_string.ptr()))) { LOG_WARN("assign sql string failed", K(ret), K(record)); } else if (OB_FAIL(proxy.write(record.tenant_id_, sql_string.ptr(), affected_rows))) { @@ -1772,6 +1802,7 @@ int ObDDLTaskRecordOperator::fill_task_record( EXTRACT_INT_FIELD_MYSQL(*result_row, "status", task_record.task_status_, int64_t); EXTRACT_UINT_FIELD_MYSQL(*result_row, "snapshot_version", task_record.snapshot_version_, uint64_t); EXTRACT_INT_FIELD_MYSQL(*result_row, "task_version", task_record.task_version_, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result_row, "ret_code", task_record.ret_code_, int64_t); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message); EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str); if (OB_SUCC(ret)) { diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 64bfddfdc5..c210faa131 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -52,7 +52,7 @@ public: bool is_valid() const; void reset(); TO_STRING_KV(K_(task_id), K_(parent_task_id), K_(ddl_type), K_(trace_id), K_(task_status), K_(tenant_id), K_(object_id), - K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version)); + K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version), K_(ret_code)); public: static const int64_t MAX_MESSAGE_LENGTH = 4096; typedef common::ObFixedLengthString TaskMessage; @@ -69,6 +69,7 @@ public: int64_t snapshot_version_; ObString message_; int64_t task_version_; + int64_t ret_code_; ObString ddl_stmt_str_; }; @@ -119,11 +120,17 @@ public: const int64_t task_id, const int64_t snapshot_version); - static int update_message( - common::ObMySQLProxy &proxy, + static int update_ret_code( + common::ObISQLClient &sql_client, const uint64_t tenant_id, const int64_t task_id, - const char *message); + const int64_t ret_code); + + static int update_message( + common::ObISQLClient &proxy, + const uint64_t tenant_id, + const int64_t task_id, + const ObString &message); static int delete_record( common::ObMySQLProxy &proxy, diff --git a/src/rootserver/ddl_task/ob_drop_index_task.cpp b/src/rootserver/ddl_task/ob_drop_index_task.cpp index 3e30b88d9f..fc3c10ae8d 100644 --- a/src/rootserver/ddl_task/ob_drop_index_task.cpp +++ b/src/rootserver/ddl_task/ob_drop_index_task.cpp @@ -86,6 +86,7 @@ int ObDropIndexTask::init( task_id_ = task_record.task_id_; parent_task_id_ = task_record.parent_task_id_; task_version_ = task_record.task_version_; + ret_code_ = task_record.ret_code_; if (nullptr != task_record.message_.ptr()) { int64_t pos = 0; if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) { diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 6b19f3ff19..f6d30dfac9 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -349,6 +349,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record) } task_id_ = task_record.task_id_; parent_task_id_ = task_record.parent_task_id_; + ret_code_ = task_record.ret_code_; is_inited_ = true; } return ret; diff --git a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp index 151924a60c..69af509bb7 100644 --- a/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp +++ b/src/rootserver/ddl_task/ob_modify_autoinc_task.cpp @@ -210,6 +210,7 @@ int ObModifyAutoincTask::init(const ObDDLTaskRecord &task_record) snapshot_version_ = task_record.snapshot_version_; tenant_id_ = task_record.tenant_id_; task_id_ = task_record.task_id_; + ret_code_ = task_record.ret_code_; is_inited_ = true; } return ret; diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 9d90a8cc5d..f018f5d6f2 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -98,6 +98,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record) task_status_ = static_cast(task_record.task_status_); snapshot_version_ = task_record.snapshot_version_; tenant_id_ = task_record.tenant_id_; + ret_code_ = task_record.ret_code_; alter_table_arg_.exec_tenant_id_ = tenant_id_; is_inited_ = true; }