Correct Longops start time after recover ddl task
This commit is contained in:
parent
69b07be3fc
commit
01399a3774
@ -1775,7 +1775,7 @@ int ObDDLScheduler::schedule_build_index_task(
|
||||
LOG_WARN("init global_index_task failed", K(ret), K(task_record));
|
||||
} else if (OB_FAIL(build_index_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("init build index task failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(build_index_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(build_index_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret), K(*build_index_task));
|
||||
}
|
||||
@ -1802,7 +1802,7 @@ int ObDDLScheduler::schedule_drop_primary_key_task(const ObDDLTaskRecord &task_r
|
||||
LOG_WARN("init drop primary key task failed", K(ret));
|
||||
} else if (OB_FAIL(drop_pk_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(drop_pk_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(drop_pk_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret), K(*drop_pk_task));
|
||||
}
|
||||
@ -1828,7 +1828,7 @@ int ObDDLScheduler::schedule_table_redefinition_task(const ObDDLTaskRecord &task
|
||||
LOG_WARN("init table redefinition task failed", K(ret));
|
||||
} else if (OB_FAIL(redefinition_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(redefinition_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(redefinition_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task));
|
||||
}
|
||||
@ -1857,7 +1857,7 @@ int ObDDLScheduler::schedule_column_redefinition_task(const ObDDLTaskRecord &tas
|
||||
LOG_WARN("init column redefinition task failed", K(ret));
|
||||
} else if (OB_FAIL(redefinition_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(redefinition_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(redefinition_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret), K(*redefinition_task));
|
||||
}
|
||||
@ -1883,7 +1883,7 @@ int ObDDLScheduler::schedule_ddl_retry_task(const ObDDLTaskRecord &task_record)
|
||||
LOG_WARN("init ddl retry task failed", K(ret));
|
||||
} else if (OB_FAIL(ddl_retry_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(ddl_retry_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(ddl_retry_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret));
|
||||
}
|
||||
@ -1909,7 +1909,7 @@ int ObDDLScheduler::schedule_constraint_task(const ObDDLTaskRecord &task_record)
|
||||
LOG_WARN("init constraint task failed", K(ret));
|
||||
} else if (OB_FAIL(constraint_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(constraint_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(constraint_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret));
|
||||
}
|
||||
@ -1935,7 +1935,7 @@ int ObDDLScheduler::schedule_modify_autoinc_task(const ObDDLTaskRecord &task_rec
|
||||
LOG_WARN("init constraint task failed", K(ret));
|
||||
} else if (OB_FAIL(modify_autoinc_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(modify_autoinc_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(modify_autoinc_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret));
|
||||
}
|
||||
@ -1961,7 +1961,7 @@ int ObDDLScheduler::schedule_drop_index_task(const ObDDLTaskRecord &task_record)
|
||||
LOG_WARN("init drop index task failed", K(ret));
|
||||
} else if (OB_FAIL(drop_index_task->set_trace_id(task_record.trace_id_))) {
|
||||
LOG_WARN("set trace id failed", K(ret));
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(drop_index_task))) {
|
||||
} else if (OB_FAIL(inner_schedule_ddl_task(drop_index_task, task_record))) {
|
||||
if (OB_ENTRY_EXIST != ret) {
|
||||
LOG_WARN("inner schedule task failed", K(ret));
|
||||
}
|
||||
@ -1989,9 +1989,11 @@ int ObDDLScheduler::add_task_to_longops_mgr(ObDDLTask *ddl_task)
|
||||
} else if (OB_FAIL(longops_stat->init(ddl_task))) {
|
||||
LOG_WARN("failed to init longops stat", K(ret), KPC(ddl_task));
|
||||
} else if (OB_FAIL(longops_mgr.register_longops(longops_stat))) {
|
||||
LOG_WARN("failed to register longops", K(ret));
|
||||
if (OB_ENTRY_EXIST == ret) {
|
||||
LOG_WARN("longops already registered", K(ret));
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
LOG_WARN("failed to register longops", K(ret));
|
||||
}
|
||||
} else {
|
||||
ddl_task->set_longops_stat(longops_stat);
|
||||
@ -2040,7 +2042,8 @@ int ObDDLScheduler::remove_ddl_task(ObDDLTask *ddl_task)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLScheduler::inner_schedule_ddl_task(ObDDLTask *ddl_task)
|
||||
int ObDDLScheduler::inner_schedule_ddl_task(ObDDLTask *ddl_task,
|
||||
const ObDDLTaskRecord &task_record)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ddl_task)) {
|
||||
@ -2052,6 +2055,7 @@ int ObDDLScheduler::inner_schedule_ddl_task(ObDDLTask *ddl_task)
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool longops_added = true;
|
||||
ddl_task->set_gmt_create(task_record.gmt_create_);
|
||||
if (OB_TMP_FAIL(add_task_to_longops_mgr(ddl_task))) {
|
||||
longops_added = false;
|
||||
LOG_WARN("add task to longops mgr failed", K(tmp_ret));
|
||||
|
@ -272,7 +272,8 @@ private:
|
||||
int alloc_ddl_task(T *&ddl_task);
|
||||
void free_ddl_task(ObDDLTask *ddl_task);
|
||||
void destroy_all_tasks();
|
||||
int inner_schedule_ddl_task(ObDDLTask *ddl_task);
|
||||
int inner_schedule_ddl_task(ObDDLTask *ddl_task,
|
||||
const ObDDLTaskRecord &task_record);
|
||||
int create_build_index_task(
|
||||
common::ObISQLClient &proxy,
|
||||
const share::schema::ObTableSchema *data_table_schema,
|
||||
|
@ -1280,7 +1280,7 @@ int ObDDLTask::copy_longops_stat(ObLongopsValue &value)
|
||||
int ret = OB_SUCCESS;
|
||||
value.trace_id_ = trace_id_;
|
||||
value.tenant_id_ = tenant_id_;
|
||||
value.start_time_ = stat_info_.start_time_;
|
||||
value.start_time_ = gmt_create_;
|
||||
value.finish_time_ = stat_info_.finish_time_;
|
||||
value.elapsed_seconds_ = (ObTimeUtility::current_time() - stat_info_.start_time_);
|
||||
value.time_remaining_ = stat_info_.time_remaining_;
|
||||
@ -2642,7 +2642,7 @@ int ObDDLTaskRecordOperator::check_is_adding_constraint(
|
||||
ObSqlString sql_string;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
|
||||
"WHERE object_id = %" PRIu64 " && ddl_type IN (%d, %d, %d)", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
|
||||
@ -2734,7 +2734,7 @@ int ObDDLTaskRecordOperator::check_has_conflict_ddl(
|
||||
ObSqlString sql_string;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
sqlclient::ObMySQLResult *result = nullptr;
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT tenant_id, task_id, object_id, target_object_id, ddl_type,"
|
||||
if (OB_FAIL(sql_string.assign_fmt("SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type,"
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id,"
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s "
|
||||
"WHERE tenant_id = %lu AND object_id = %lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME,
|
||||
@ -2842,7 +2842,7 @@ int ObDDLTaskRecordOperator::get_ddl_task_record(const int64_t task_id,
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s WHERE task_id=%lu", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME, task_id))) {
|
||||
LOG_WARN("assign sql string failed", K(ret), K(task_id));
|
||||
@ -2869,7 +2869,7 @@ int ObDDLTaskRecordOperator::get_all_ddl_task_record(common::ObMySQLProxy &proxy
|
||||
if (OB_UNLIKELY(!proxy.is_inited())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(proxy.is_inited()));
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
} else if (OB_FAIL(sql_string.assign_fmt(" SELECT time_to_usec(gmt_create) AS create_time, tenant_id, task_id, object_id, target_object_id, ddl_type, "
|
||||
"schema_version, parent_task_id, trace_id, status, snapshot_version, task_version, execution_id, "
|
||||
"UNHEX(ddl_stmt_str) as ddl_stmt_str_unhex, ret_code, UNHEX(message) as message_unhex FROM %s ", OB_ALL_VIRTUAL_DDL_TASK_STATUS_TNAME))) {
|
||||
LOG_WARN("assign sql string failed", K(ret));
|
||||
@ -3016,6 +3016,8 @@ int ObDDLTaskRecordOperator::fill_task_record(
|
||||
ObString ddl_stmt_str;
|
||||
char *buf_ddl_stmt_str = nullptr;
|
||||
char *buf_task_message = nullptr;
|
||||
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "create_time", task_record.gmt_create_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "task_id", task_record.task_id_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "parent_task_id", task_record.parent_task_id_, uint64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result_row, "tenant_id", task_record.tenant_id_, uint64_t);
|
||||
|
@ -76,6 +76,7 @@ public:
|
||||
static const int64_t MAX_MESSAGE_LENGTH = 4096;
|
||||
typedef common::ObFixedLengthString<MAX_MESSAGE_LENGTH> TaskMessage;
|
||||
public:
|
||||
uint64_t gmt_create_;
|
||||
int64_t task_id_;
|
||||
int64_t parent_task_id_;
|
||||
share::ObDDLType ddl_type_;
|
||||
@ -462,6 +463,8 @@ public:
|
||||
int64_t get_parent_task_id() const { return parent_task_id_; }
|
||||
int64_t get_task_version() const { return task_version_; }
|
||||
int64_t get_parallelism() const { return parallelism_; }
|
||||
uint64_t get_gmt_create() const { return gmt_create_; }
|
||||
void set_gmt_create(uint64_t gmt_create) { gmt_create_ = gmt_create; }
|
||||
static int deep_copy_table_arg(common::ObIAllocator &allocator,
|
||||
const obrpc::ObDDLArg &source_arg,
|
||||
obrpc::ObDDLArg &dest_arg);
|
||||
@ -575,6 +578,7 @@ protected:
|
||||
TraceId sys_task_id_;
|
||||
int64_t err_code_occurence_cnt_; // occurence count for all error return codes not in white list.
|
||||
share::ObDDLLongopsStat *longops_stat_;
|
||||
uint64_t gmt_create_;
|
||||
ObDDLTaskStatInfo stat_info_;
|
||||
int64_t delay_schedule_time_;
|
||||
int64_t next_schedule_ts_;
|
||||
|
@ -18,6 +18,7 @@ namespace oceanbase
|
||||
namespace rootserver
|
||||
{
|
||||
class ObDDLTask;
|
||||
class ObDDLTaskRecord;
|
||||
}
|
||||
namespace share
|
||||
{
|
||||
|
@ -76,7 +76,6 @@ int ObLongopsMgr::register_longops(ObILongopsStat *stat)
|
||||
} else {
|
||||
ObBucketHashWLockGuard guard(bucket_lock_, stat->get_longops_key().hash());
|
||||
if (OB_FAIL(map_.set_refactored(stat->get_longops_key(), stat))) {
|
||||
LOG_WARN("failed to set map", K(ret), KPC(stat));
|
||||
if (OB_HASH_EXIST == ret) {
|
||||
ret = OB_ENTRY_EXIST;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user