ddl task get schema with version
This commit is contained in:
parent
c898272f6c
commit
543ec6c9ea
@ -1752,6 +1752,8 @@ int ObConstraintTask::check_health()
|
||||
need_retry_ = false;
|
||||
} else if (OB_FAIL(refresh_status())) { // refresh task status
|
||||
LOG_WARN("refresh status failed", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
} else {
|
||||
ObMultiVersionSchemaService &schema_service = root_service->get_schema_service();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
|
@ -1327,6 +1327,8 @@ int ObDDLRedefinitionTask::check_health()
|
||||
need_retry_ = false;
|
||||
} else if (OB_FAIL(refresh_status())) { // refresh task status
|
||||
LOG_WARN("refresh status failed", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
} else {
|
||||
ObMultiVersionSchemaService &schema_service = root_service->get_schema_service();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
|
@ -447,6 +447,8 @@ int ObDDLRetryTask::check_health()
|
||||
need_retry_ = false;
|
||||
} else if (OB_FAIL(refresh_status())) {
|
||||
LOG_WARN("refresh status failed", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
}
|
||||
if (ObDDLTaskStatus::FAIL == static_cast<ObDDLTaskStatus>(task_status_)
|
||||
|| ObDDLTaskStatus::SUCCESS == static_cast<ObDDLTaskStatus>(task_status_)) {
|
||||
@ -592,4 +594,4 @@ int ObDDLRetryTask::update_task_status_succ(
|
||||
LOG_WARN("update task status failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
@ -364,6 +364,29 @@ int ObDDLTask::refresh_status()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::refresh_schema_version()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObDDLTask has not been inited", K(ret));
|
||||
} else if (schema_version_ > 0 && schema_version_ != UINT64_MAX) {
|
||||
ObMultiVersionSchemaService &schema_service = ObMultiVersionSchemaService::get_instance();
|
||||
int64_t refreshed_schema_version = 0;
|
||||
if (OB_FAIL(schema_service.async_refresh_schema(tenant_id_, schema_version_))) {
|
||||
LOG_WARN("async refresh schema version failed", K(ret), K(tenant_id_), K(schema_version_));
|
||||
} else if (OB_FAIL(schema_service.get_tenant_refreshed_schema_version(tenant_id_, refreshed_schema_version))) {
|
||||
LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id_));
|
||||
} else if (refreshed_schema_version < schema_version_) {
|
||||
ret = OB_SCHEMA_EAGAIN;
|
||||
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
|
||||
LOG_INFO("tenant schema not refreshed to the target version", K(ret), K(tenant_id_), K(schema_version_), K(refreshed_schema_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObDDLTask::remove_task_record()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -299,6 +299,7 @@ public:
|
||||
int convert_to_record(ObDDLTaskRecord &task_record, common::ObIAllocator &allocator);
|
||||
int switch_status(share::ObDDLTaskStatus new_status, const int ret_code);
|
||||
int refresh_status();
|
||||
int refresh_schema_version();
|
||||
int remove_task_record();
|
||||
int report_error_code(const ObString &forward_user_message, const int64_t affected_rows = 0);
|
||||
int wait_trans_end(
|
||||
|
@ -364,6 +364,8 @@ int ObDropIndexTask::check_switch_succ()
|
||||
} else if (OB_ISNULL(root_service_)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("error sys", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
} else if (OB_FAIL(root_service_->get_schema_service().get_tenant_schema_guard(tenant_id_, schema_guard))) {
|
||||
LOG_WARN("get tenant schema failed", K(ret), K(tenant_id_));
|
||||
} else if (OB_FAIL(schema_guard.check_table_exist(tenant_id_, target_object_id_, is_index_exist))) {
|
||||
|
@ -395,6 +395,8 @@ int ObIndexBuildTask::check_health()
|
||||
need_retry_ = false; // only stop run the task, need not clean up task context
|
||||
} else if (OB_FAIL(refresh_status())) { // refresh task status
|
||||
LOG_WARN("refresh status failed", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
} else {
|
||||
ObMultiVersionSchemaService &schema_service = root_service_->get_schema_service();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
|
@ -544,6 +544,8 @@ int ObModifyAutoincTask::check_health()
|
||||
need_retry_ = false;
|
||||
} else if (OB_FAIL(refresh_status())) { // refresh task status
|
||||
LOG_WARN("refresh status failed", K(ret));
|
||||
} else if (OB_FAIL(refresh_schema_version())) {
|
||||
LOG_WARN("refresh schema version failed", K(ret));
|
||||
} else {
|
||||
ObMultiVersionSchemaService &schema_service = root_service->get_schema_service();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
|
Loading…
x
Reference in New Issue
Block a user