fix switch leader cause map data is deleted bug
This commit is contained in:
@ -2626,50 +2626,90 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::get_all_record(
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObDDLTaskRecord> &records)
|
||||
int ObDDLTaskRecordOperator::get_task_record(const ObSqlString &sql_string,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObDDLTaskRecord> &records)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
records.reset();
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
if (OB_UNLIKELY(!proxy.is_inited() || !sql_string.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
LOG_WARN("invalid argment", K(ret), K(proxy.is_inited()), K(sql_string));
|
||||
} else {
|
||||
ObSqlString sql_string;
|
||||
records.reset();
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObDDLTaskRecord record;
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
|
||||
if (OB_FAIL(proxy.read(res, sql_string.ptr()))) {
|
||||
LOG_WARN("query ddl task record failed", K(ret), K(sql_string));
|
||||
} else if (OB_ISNULL((result = res.get_result()))) {
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", K(ret), KP(result));
|
||||
} else {
|
||||
ObDDLTaskRecord task_record;
|
||||
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||
if (OB_FAIL(fill_task_record(result, allocator, task_record))) {
|
||||
if (OB_FAIL(fill_task_record(result, allocator, record))) {
|
||||
LOG_WARN("fill index task failed", K(ret), K(result));
|
||||
} else if (!task_record.is_valid()) {
|
||||
} else if (!record.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("task record is invalid", K(ret), K(task_record));
|
||||
} else if (OB_FAIL(records.push_back(task_record))) {
|
||||
LOG_WARN("push back task record failed", K(ret), K(task_record));
|
||||
LOG_WARN("task record is invalid", K(ret), K(record));
|
||||
} else if (OB_FAIL(records.push_back(record))) {
|
||||
LOG_WARN("push back failed", K(record));
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id,
|
||||
common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
ObDDLTaskRecord &record)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql_string;
|
||||
ObArray<ObDDLTaskRecord> task_records;
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(task_id));
|
||||
} else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, task_records))) {
|
||||
LOG_WARN("get task record failed", K(ret), K(sql_string));
|
||||
} else if (task_records.count() != 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("one task_id but task_records count() != 1", K(ret), K(task_id), K(task_records));
|
||||
} else if (OB_FAIL(task_records.at(0, record))) {
|
||||
LOG_WARN("get task_record failed", K(ret), K(task_id));
|
||||
} else if (!record.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("task record not valid", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::get_all_ddl_task_record(common::ObMySQLProxy &proxy,
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObIArray<ObDDLTaskRecord> &records)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql_string;
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
} else if (OB_FAIL(get_task_record(sql_string, proxy, allocator, records))) {
|
||||
LOG_WARN("get task record failed", K(ret), K(sql_string));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTaskRecordOperator::check_task_id_exist(common::ObMySQLProxy &proxy, const int64_t task_id, bool &exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
Reference in New Issue
Block a user