save ddl ret_code when switch_status
This commit is contained in:

committed by
wangzelin.wzl

parent
dc586042af
commit
975f88770d
@ -97,6 +97,7 @@ int ObColumnRedefinitionTask::init(const ObDDLTaskRecord &task_record)
|
||||
task_status_ = static_cast<ObDDLTaskStatus>(task_record.task_status_);
|
||||
snapshot_version_ = task_record.snapshot_version_;
|
||||
tenant_id_ = task_record.tenant_id_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
|
@ -553,6 +553,7 @@ int ObConstraintTask::init(const ObDDLTaskRecord &task_record)
|
||||
task_id_ = task_record.task_id_;
|
||||
parent_task_id_ = task_record.parent_task_id_;
|
||||
is_table_hidden_ = table_schema->is_user_hidden_table();
|
||||
ret_code_ = task_record.ret_code_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
|
@ -207,6 +207,7 @@ int ObDDLRetryTask::init(const ObDDLTaskRecord &task_record)
|
||||
task_type_ = task_record.ddl_type_;
|
||||
parent_task_id_ = task_record.parent_task_id_;
|
||||
task_version_ = task_record.task_version_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
task_status_ = static_cast<ObDDLTaskStatus>(task_record.task_status_);
|
||||
if (nullptr != task_record.message_) {
|
||||
int64_t pos = 0;
|
||||
|
@ -259,6 +259,7 @@ int ObDDLTask::convert_to_record(
|
||||
task_record.task_id_ = get_task_id();
|
||||
task_record.parent_task_id_ = get_parent_task_id();
|
||||
task_record.task_version_ = get_task_version();
|
||||
task_record.ret_code_ = get_ret_code();
|
||||
const ObString &ddl_stmt_str = get_ddl_stmt_str();
|
||||
if (serialize_param_size > 0) {
|
||||
char *buf = nullptr;
|
||||
@ -328,6 +329,8 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code)
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_task_status(
|
||||
trans, tenant_id_, task_id_, static_cast<int64_t>(real_new_status)))) {
|
||||
LOG_WARN("update task status failed", K(ret), K(task_id_), K(new_status));
|
||||
} else if (OB_FAIL(ObDDLTaskRecordOperator::update_ret_code(trans, tenant_id_, task_id_, ret_code_))) {
|
||||
LOG_WARN("failed to update ret code", K(ret));
|
||||
}
|
||||
|
||||
bool commit = (OB_SUCCESS == ret);
|
||||
@ -1378,7 +1381,8 @@ bool ObDDLTaskRecord::is_valid() const
|
||||
&& tenant_id_ > 0
|
||||
&& task_version_ > 0
|
||||
&& OB_INVALID_ID != object_id_
|
||||
&& schema_version_ > 0;
|
||||
&& schema_version_ > 0
|
||||
&& ret_code_ >= 0;
|
||||
return is_valid;
|
||||
}
|
||||
|
||||
@ -1396,6 +1400,7 @@ void ObDDLTaskRecord::reset()
|
||||
snapshot_version_ = 0;
|
||||
message_.reset();
|
||||
task_version_ = 0;
|
||||
ret_code_ = OB_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
@ -1448,26 +1453,51 @@ int ObDDLTaskRecordOperator::update_snapshot_version(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::update_message(
|
||||
common::ObMySQLProxy &proxy,
|
||||
int ObDDLTaskRecordOperator::update_ret_code(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const char *message)
|
||||
const int64_t ret_code)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql_string;
|
||||
int64_t affected_rows = 0;
|
||||
if (OB_UNLIKELY(!proxy.is_inited() || nullptr == message
|
||||
|| 0 == strlen(message)
|
||||
|| strlen(message) >= ObDDLTaskRecord::MAX_MESSAGE_LENGTH
|
||||
if (OB_ISNULL(sql_client.get_pool()) || OB_UNLIKELY(task_id <= 0 || tenant_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", K(ret), K(tenant_id), K(task_id));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET ret_code=%lu WHERE task_id=%lu ",
|
||||
OB_ALL_DDL_TASK_STATUS_TNAME, ret_code, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(ret_code), K(task_id));
|
||||
} else if (OB_FAIL(sql_client.write(tenant_id, sql_string.ptr(), affected_rows))) {
|
||||
LOG_WARN("update snapshot_version of ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_UNLIKELY(affected_rows < 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::update_message(
|
||||
common::ObISQLClient &proxy,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const ObString &message)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql_string;
|
||||
ObSqlString message_string;
|
||||
int64_t affected_rows = 0;
|
||||
if (OB_UNLIKELY(message.empty()
|
||||
|| tenant_id <= 0 || task_id <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()), K(tenant_id), K(task_id), K(message));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET message='%s' WHERE task_id=%lu",
|
||||
OB_ALL_DDL_TASK_STATUS_TNAME, message, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(message));
|
||||
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(task_id), K(message));
|
||||
} else if (OB_FAIL(to_hex_str(message, message_string))) {
|
||||
LOG_WARN("append hex escaped string failed", K(ret));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" UPDATE %s SET message=\"%.*s\" WHERE task_id=%lu",
|
||||
OB_ALL_DDL_TASK_STATUS_TNAME, static_cast<int>(message_string.length()), message_string.ptr(), task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(message_string));
|
||||
} else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) {
|
||||
LOG_WARN("update message of ddl task record failed", K(ret), K(sql_string));
|
||||
LOG_WARN("update message of ddl task record failed", K(ret), K(sql_string), K(message_string));
|
||||
} else if (OB_UNLIKELY(affected_rows < 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows));
|
||||
@ -1728,11 +1758,11 @@ int ObDDLTaskRecordOperator::insert_record(
|
||||
} else if (OB_FAIL(to_hex_str(record.message_, message_string))) {
|
||||
LOG_WARN("append hex escaped string failed", K(ret));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(
|
||||
" INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ddl_stmt_str, message) "
|
||||
" VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, '%.*s', \"%.*s\") ",
|
||||
" INSERT INTO %s (task_id, parent_task_id, tenant_id, object_id, schema_version, target_object_id, ddl_type, trace_id, status, task_version, ret_code, ddl_stmt_str, message) "
|
||||
" VALUES (%lu, %lu, %lu, %lu, %lu, %lu, %d, '%s', %ld, %lu, %lu, '%.*s', \"%.*s\") ",
|
||||
OB_ALL_DDL_TASK_STATUS_TNAME, record.task_id_, record.parent_task_id_,
|
||||
ObSchemaUtils::get_extract_tenant_id(record.tenant_id_, record.tenant_id_), record.object_id_, record.schema_version_,
|
||||
get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_,
|
||||
get_record_id(record.ddl_type_, record.target_object_id_), record.ddl_type_, trace_id_str, record.task_status_, record.task_version_, record.ret_code_,
|
||||
static_cast<int>(ddl_stmt_string.length()), ddl_stmt_string.ptr(), static_cast<int>(message_string.length()), message_string.ptr()))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(record));
|
||||
} else if (OB_FAIL(proxy.write(record.tenant_id_, sql_string.ptr(), affected_rows))) {
|
||||
@ -1772,6 +1802,7 @@ int ObDDLTaskRecordOperator::fill_task_record(
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "status", task_record.task_status_, int64_t);
|
||||
EXTRACT_UINT_FIELD_MYSQL(*result_row, "snapshot_version", task_record.snapshot_version_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "task_version", task_record.task_version_, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "ret_code", task_record.ret_code_, int64_t);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "message_unhex", task_message);
|
||||
EXTRACT_VARCHAR_FIELD_MYSQL(*result_row, "ddl_stmt_str_unhex", ddl_stmt_str);
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -52,7 +52,7 @@ public:
|
||||
bool is_valid() const;
|
||||
void reset();
|
||||
TO_STRING_KV(K_(task_id), K_(parent_task_id), K_(ddl_type), K_(trace_id), K_(task_status), K_(tenant_id), K_(object_id),
|
||||
K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version));
|
||||
K_(schema_version), K_(target_object_id), K_(snapshot_version), K_(message), K_(task_version), K_(ret_code));
|
||||
public:
|
||||
static const int64_t MAX_MESSAGE_LENGTH = 4096;
|
||||
typedef common::ObFixedLengthString<MAX_MESSAGE_LENGTH> TaskMessage;
|
||||
@ -69,6 +69,7 @@ public:
|
||||
int64_t snapshot_version_;
|
||||
ObString message_;
|
||||
int64_t task_version_;
|
||||
int64_t ret_code_;
|
||||
ObString ddl_stmt_str_;
|
||||
};
|
||||
|
||||
@ -119,11 +120,17 @@ public:
|
||||
const int64_t task_id,
|
||||
const int64_t snapshot_version);
|
||||
|
||||
static int update_message(
|
||||
common::ObMySQLProxy &proxy,
|
||||
static int update_ret_code(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const char *message);
|
||||
const int64_t ret_code);
|
||||
|
||||
static int update_message(
|
||||
common::ObISQLClient &proxy,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t task_id,
|
||||
const ObString &message);
|
||||
|
||||
static int delete_record(
|
||||
common::ObMySQLProxy &proxy,
|
||||
|
@ -86,6 +86,7 @@ int ObDropIndexTask::init(
|
||||
task_id_ = task_record.task_id_;
|
||||
parent_task_id_ = task_record.parent_task_id_;
|
||||
task_version_ = task_record.task_version_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
if (nullptr != task_record.message_.ptr()) {
|
||||
int64_t pos = 0;
|
||||
if (OB_FAIL(deserlize_params_from_message(task_record.message_.ptr(), task_record.message_.length(), pos))) {
|
||||
|
@ -349,6 +349,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record)
|
||||
}
|
||||
task_id_ = task_record.task_id_;
|
||||
parent_task_id_ = task_record.parent_task_id_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
|
@ -210,6 +210,7 @@ int ObModifyAutoincTask::init(const ObDDLTaskRecord &task_record)
|
||||
snapshot_version_ = task_record.snapshot_version_;
|
||||
tenant_id_ = task_record.tenant_id_;
|
||||
task_id_ = task_record.task_id_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
|
@ -98,6 +98,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record)
|
||||
task_status_ = static_cast<ObDDLTaskStatus>(task_record.task_status_);
|
||||
snapshot_version_ = task_record.snapshot_version_;
|
||||
tenant_id_ = task_record.tenant_id_;
|
||||
ret_code_ = task_record.ret_code_;
|
||||
alter_table_arg_.exec_tenant_id_ = tenant_id_;
|
||||
is_inited_ = true;
|
||||
}
|
||||
|
Reference in New Issue
Block a user