fix some problems.
This commit is contained in:
@ -2432,7 +2432,7 @@ int ObSyncTabletAutoincSeqCtx::sync()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_LS_NOT_EXIST == ret || is_location_service_renew_error(ret)) {
|
if (share::ObIDDLTask::in_ddl_retry_white_list(ret)) {
|
||||||
need_renew_location_ = true;
|
need_renew_location_ = true;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -979,11 +979,10 @@ int ObDDLTask::switch_status(const ObDDLTaskStatus new_status, const bool enable
|
|||||||
if (OB_ISNULL(root_service = GCTX.root_service_)) {
|
if (OB_ISNULL(root_service = GCTX.root_service_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("error unexpected, root service must not be nullptr", K(ret));
|
LOG_WARN("error unexpected, root service must not be nullptr", K(ret));
|
||||||
} else if (OB_FAIL(ObDDLUtil::check_tenant_status_normal(&root_service->get_sql_proxy(), dst_tenant_id_))
|
} else if (OB_FAIL(ObDDLUtil::check_tenant_status_normal(&root_service->get_sql_proxy(), dst_tenant_id_))) {
|
||||||
|| OB_FAIL(ObDDLUtil::check_tenant_status_normal(&root_service->get_sql_proxy(), tenant_id_))) {
|
|
||||||
if (OB_TENANT_HAS_BEEN_DROPPED == ret || OB_STANDBY_READ_ONLY == ret) {
|
if (OB_TENANT_HAS_BEEN_DROPPED == ret || OB_STANDBY_READ_ONLY == ret) {
|
||||||
need_retry_ = false;
|
need_retry_ = false;
|
||||||
LOG_INFO("tenant status is abnormal, exit anyway", K(ret), K(task_id_), K(parent_task_id_), K(tenant_id_), K(dst_tenant_id_));
|
LOG_INFO("tenant status is abnormal, exit anyway", K(ret), K_(task_id), K_(parent_task_id), K_(dst_tenant_id));
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), dst_tenant_id_))) {
|
} else if (OB_FAIL(trans.start(&root_service->get_sql_proxy(), dst_tenant_id_))) {
|
||||||
LOG_WARN("start transaction failed", K(ret));
|
LOG_WARN("start transaction failed", K(ret));
|
||||||
@ -1059,33 +1058,17 @@ int ObDDLTask::refresh_status()
|
|||||||
int ObDDLTask::refresh_schema_version()
|
int ObDDLTask::refresh_schema_version()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObMultiVersionSchemaService &schema_service = ObMultiVersionSchemaService::get_instance();
|
|
||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ObDDLTask has not been inited", K(ret));
|
LOG_WARN("ObDDLTask has not been inited", K(ret));
|
||||||
}
|
} else if (OB_FAIL(ObDDLUtil::check_schema_version_refreshed(tenant_id_, schema_version_))) {
|
||||||
|
if (OB_SCHEMA_EAGAIN != ret) {
|
||||||
if (OB_SUCC(ret) && (schema_version_ > 0 && schema_version_ != UINT64_MAX)) {
|
LOG_WARN("check schema version refreshed failed", K(ret), K_(tenant_id), K_(schema_version));
|
||||||
int64_t refreshed_schema_version = 0;
|
|
||||||
if (OB_FAIL(schema_service.get_tenant_refreshed_schema_version(tenant_id_, refreshed_schema_version))) {
|
|
||||||
LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id_));
|
|
||||||
} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < schema_version_) {
|
|
||||||
ret = OB_SCHEMA_EAGAIN;
|
|
||||||
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
|
|
||||||
LOG_INFO("tenant schema not refreshed to the target version", K(ret), K(tenant_id_), K(schema_version_), K(refreshed_schema_version));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
} else if (tenant_id_ == dst_tenant_id_) {
|
||||||
|
} else if (OB_FAIL(ObDDLUtil::check_schema_version_refreshed(dst_tenant_id_, dst_schema_version_))) {
|
||||||
if (OB_SUCC(ret) && (tenant_id_ != dst_tenant_id_) && (dst_schema_version_ > 0 && dst_schema_version_ != UINT64_MAX)) {
|
if (OB_SCHEMA_EAGAIN != ret) {
|
||||||
int64_t refreshed_schema_version = 0;
|
LOG_WARN("check schema version refreshed failed", K(ret), K_(dst_tenant_id), K_(dst_schema_version));
|
||||||
if (OB_FAIL(schema_service.get_tenant_refreshed_schema_version(dst_tenant_id_, refreshed_schema_version))) {
|
|
||||||
LOG_WARN("get refreshed schema version failed", K(ret), K(dst_tenant_id_));
|
|
||||||
} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < dst_schema_version_) {
|
|
||||||
ret = OB_SCHEMA_EAGAIN;
|
|
||||||
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
|
|
||||||
LOG_INFO("tenant schema not refreshed to the target version", K(ret), K(dst_tenant_id_), K(dst_schema_version_), K(refreshed_schema_version));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -1119,7 +1102,7 @@ int ObDDLTask::report_error_code(const ObString &forward_user_message, const int
|
|||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("ObIndexBuildTask has not been inited", K(ret));
|
LOG_WARN("ObIndexBuildTask has not been inited", K(ret));
|
||||||
} else if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_table_id(tenant_id_, object_id_, is_oracle_mode))) {
|
} else if (OB_FAIL(ObCompatModeGetter::check_is_oracle_mode_with_table_id(dst_tenant_id_, object_id_, is_oracle_mode))) {
|
||||||
LOG_WARN("check if oracle mode failed", K(ret), K(object_id_));
|
LOG_WARN("check if oracle mode failed", K(ret), K(object_id_));
|
||||||
} else {
|
} else {
|
||||||
ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
|
ObDDLErrorMessageTableOperator::ObBuildDDLErrorMessage error_message;
|
||||||
|
|||||||
@ -161,7 +161,7 @@ int ObRecoverRestoreTableTask::success()
|
|||||||
int ObRecoverRestoreTableTask::fail()
|
int ObRecoverRestoreTableTask::fail()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObArenaAllocator tmp_arena;
|
ObArenaAllocator tmp_arena("RestoreDDLClean");
|
||||||
int64_t rpc_timeout = 0;
|
int64_t rpc_timeout = 0;
|
||||||
int64_t all_orig_index_tablet_count = 0;
|
int64_t all_orig_index_tablet_count = 0;
|
||||||
const ObDatabaseSchema *db_schema = nullptr;
|
const ObDatabaseSchema *db_schema = nullptr;
|
||||||
@ -171,8 +171,8 @@ int ObRecoverRestoreTableTask::fail()
|
|||||||
obrpc::ObTableItem table_item;
|
obrpc::ObTableItem table_item;
|
||||||
obrpc::ObDropTableArg drop_table_arg;
|
obrpc::ObDropTableArg drop_table_arg;
|
||||||
obrpc::ObDDLRes drop_table_res;
|
obrpc::ObDDLRes drop_table_res;
|
||||||
|
bool need_cleanup = true;
|
||||||
{
|
{
|
||||||
ObSchemaGetterGuard src_tenant_schema_guard;
|
|
||||||
ObSchemaGetterGuard dst_tenant_schema_guard;
|
ObSchemaGetterGuard dst_tenant_schema_guard;
|
||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -180,16 +180,13 @@ int ObRecoverRestoreTableTask::fail()
|
|||||||
} else if (OB_ISNULL(root_service)) {
|
} else if (OB_ISNULL(root_service)) {
|
||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
LOG_WARN("error sys, root service must not be nullptr", K(ret));
|
||||||
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id_, src_tenant_schema_guard))) {
|
|
||||||
LOG_WARN("get schema guard failed", K(ret), K(tenant_id_));
|
|
||||||
} else if (OB_FAIL(get_orig_all_index_tablet_count(src_tenant_schema_guard, all_orig_index_tablet_count))) {
|
|
||||||
LOG_WARN("get orig all tablet count failed", K(ret));
|
|
||||||
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(dst_tenant_id_, dst_tenant_schema_guard))) {
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(dst_tenant_id_, dst_tenant_schema_guard))) {
|
||||||
LOG_WARN("get schema guard failed", K(ret), K(dst_tenant_id_));
|
LOG_WARN("get schema guard failed", K(ret), K(dst_tenant_id_));
|
||||||
} else if (OB_FAIL(dst_tenant_schema_guard.get_table_schema(dst_tenant_id_, target_object_id_, table_schema))) {
|
} else if (OB_FAIL(dst_tenant_schema_guard.get_table_schema(dst_tenant_id_, target_object_id_, table_schema))) {
|
||||||
LOG_WARN("get table schema failed", K(ret), K(dst_tenant_id_), K(target_object_id_));
|
LOG_WARN("get table schema failed", K(ret), K(dst_tenant_id_), K(target_object_id_));
|
||||||
} else if (OB_ISNULL(table_schema)) {
|
} else if (OB_ISNULL(table_schema)) {
|
||||||
// already dropped.
|
// already dropped.
|
||||||
|
need_cleanup = false;
|
||||||
LOG_INFO("already dropped", K(ret), K(dst_tenant_id_), K(target_object_id_));
|
LOG_INFO("already dropped", K(ret), K(dst_tenant_id_), K(target_object_id_));
|
||||||
} else if (OB_FAIL(dst_tenant_schema_guard.get_database_schema(dst_tenant_id_, table_schema->get_database_id(), db_schema))) {
|
} else if (OB_FAIL(dst_tenant_schema_guard.get_database_schema(dst_tenant_id_, table_schema->get_database_id(), db_schema))) {
|
||||||
LOG_WARN("get db schema failed", K(ret), K(dst_tenant_id_), KPC(table_schema));
|
LOG_WARN("get db schema failed", K(ret), K(dst_tenant_id_), KPC(table_schema));
|
||||||
@ -218,11 +215,11 @@ int ObRecoverRestoreTableTask::fail()
|
|||||||
drop_table_arg.compat_mode_ = is_oracle_mode ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
|
drop_table_arg.compat_mode_ = is_oracle_mode ? lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret) && need_cleanup) {
|
||||||
obrpc::ObCommonRpcProxy common_rpc_proxy = root_service->get_common_rpc_proxy().to(GCTX.self_addr()).timeout(rpc_timeout);
|
|
||||||
if (OB_FAIL(drop_table_arg.tables_.push_back(table_item))) {
|
if (OB_FAIL(drop_table_arg.tables_.push_back(table_item))) {
|
||||||
LOG_WARN("push back failed", K(ret), K(drop_table_arg));
|
LOG_WARN("push back failed", K(ret), K(drop_table_arg));
|
||||||
} else if (OB_FAIL(common_rpc_proxy.drop_table(drop_table_arg, drop_table_res))) {
|
} else if (OB_FAIL(root_service->get_common_rpc_proxy().to(GCTX.self_addr())
|
||||||
|
.timeout(rpc_timeout).drop_table(drop_table_arg, drop_table_res))) {
|
||||||
LOG_WARN("drop table failed", K(ret), K(rpc_timeout), K(drop_table_arg));
|
LOG_WARN("drop table failed", K(ret), K(rpc_timeout), K(drop_table_arg));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -234,6 +231,43 @@ int ObRecoverRestoreTableTask::fail()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObRecoverRestoreTableTask::check_health()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObRootService *root_service = GCTX.root_service_;
|
||||||
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
LOG_WARN("not inited", K(ret));
|
||||||
|
} else if (OB_ISNULL(root_service)) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("error sys", K(ret));
|
||||||
|
} else if (!root_service->in_service()) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
LOG_WARN("root service not in service, do not need retry", K(ret), K(object_id_), K(target_object_id_));
|
||||||
|
need_retry_ = false;
|
||||||
|
} else if (OB_FAIL(ObDDLUtil::check_tenant_status_normal(&root_service->get_sql_proxy(), tenant_id_))) {
|
||||||
|
// switch to build failed if the source tenant is been dropped,
|
||||||
|
// in order to remove the destination tenant's persistent task record.
|
||||||
|
if (OB_TENANT_HAS_BEEN_DROPPED == ret) {
|
||||||
|
const ObDDLTaskStatus old_status = static_cast<ObDDLTaskStatus>(task_status_);
|
||||||
|
const ObDDLTaskStatus new_status = ObDDLTaskStatus::FAIL;
|
||||||
|
int tmp_ret = switch_status(new_status, false, ret);
|
||||||
|
LOG_INFO("switch status to build_failed", K(ret), K(tmp_ret), K_(task_status), K(old_status), K(new_status));
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
} else if (OB_STANDBY_READ_ONLY == ret) {
|
||||||
|
// do not care about the role of the source tenant is expected.
|
||||||
|
if (OB_FAIL(ObDDLRedefinitionTask::check_health())) {
|
||||||
|
LOG_WARN("check health failed", K(ret), K_(task_status));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG_WARN("check tenant status normal failed", K(ret), K_(tenant_id));
|
||||||
|
}
|
||||||
|
} else if (OB_FAIL(ObDDLRedefinitionTask::check_health())) {
|
||||||
|
LOG_WARN("check health failed", K(ret), K_(task_status));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObRecoverRestoreTableTask::process()
|
int ObRecoverRestoreTableTask::process()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
@ -57,6 +57,8 @@ protected:
|
|||||||
virtual int obtain_snapshot(const share::ObDDLTaskStatus next_task_status) override;
|
virtual int obtain_snapshot(const share::ObDDLTaskStatus next_task_status) override;
|
||||||
virtual int fail() override;
|
virtual int fail() override;
|
||||||
virtual int success() override;
|
virtual int success() override;
|
||||||
|
private:
|
||||||
|
int check_health();
|
||||||
private:
|
private:
|
||||||
static const int64_t OB_RECOVER_RESTORE_TABLE_TASK_VERSION = 1L;
|
static const int64_t OB_RECOVER_RESTORE_TABLE_TASK_VERSION = 1L;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1383,6 +1383,27 @@ int ObDDLUtil::check_tenant_status_normal(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObDDLUtil::check_schema_version_refreshed(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const int64_t target_schema_version)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t refreshed_schema_version = 0;
|
||||||
|
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || target_schema_version <= 0)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid args", K(ret), K(tenant_id), K(target_schema_version));
|
||||||
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_refreshed_schema_version(
|
||||||
|
tenant_id, refreshed_schema_version))) {
|
||||||
|
LOG_WARN("get refreshed schema version failed", K(ret), K(tenant_id), K(refreshed_schema_version));
|
||||||
|
} else if (!ObSchemaService::is_formal_version(refreshed_schema_version) || refreshed_schema_version < target_schema_version) {
|
||||||
|
ret = OB_SCHEMA_EAGAIN;
|
||||||
|
if (REACH_TIME_INTERVAL(1000L * 1000L)) {
|
||||||
|
LOG_INFO("tenant schema not refreshed to the target version", K(ret), K(tenant_id), K(target_schema_version), K(refreshed_schema_version));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/****************** ObCheckTabletDataComplementOp *************/
|
/****************** ObCheckTabletDataComplementOp *************/
|
||||||
|
|
||||||
int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status(
|
||||||
|
|||||||
@ -499,6 +499,9 @@ public:
|
|||||||
static int check_tenant_status_normal(
|
static int check_tenant_status_normal(
|
||||||
ObISQLClient *proxy,
|
ObISQLClient *proxy,
|
||||||
const uint64_t check_tenant_id);
|
const uint64_t check_tenant_id);
|
||||||
|
static int check_schema_version_refreshed(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const int64_t target_schema_version);
|
||||||
private:
|
private:
|
||||||
static int generate_order_by_str(
|
static int generate_order_by_str(
|
||||||
const ObIArray<int64_t> &select_column_ids,
|
const ObIArray<int64_t> &select_column_ids,
|
||||||
|
|||||||
@ -80,7 +80,16 @@ int ObComplementDataParam::init(const ObDDLBuildSingleReplicaRequestArg &arg)
|
|||||||
LOG_WARN("invalid arg", K(ret), K(arg));
|
LOG_WARN("invalid arg", K(ret), K(arg));
|
||||||
} else {
|
} else {
|
||||||
MTL_SWITCH (OB_SYS_TENANT_ID) {
|
MTL_SWITCH (OB_SYS_TENANT_ID) {
|
||||||
if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(
|
if (OB_FAIL(ObDDLUtil::check_schema_version_refreshed(orig_tenant_id, orig_schema_version))) {
|
||||||
|
if (OB_SCHEMA_EAGAIN != ret) {
|
||||||
|
LOG_WARN("check schema version refreshed failed", K(ret), K(orig_tenant_id), K(orig_schema_version));
|
||||||
|
}
|
||||||
|
} else if (orig_tenant_id != dest_tenant_id
|
||||||
|
&& OB_FAIL(ObDDLUtil::check_schema_version_refreshed(dest_tenant_id, dest_schema_version))) {
|
||||||
|
if (OB_SCHEMA_EAGAIN != ret) {
|
||||||
|
LOG_WARN("check schema version refreshed failed", K(ret), K(dest_tenant_id), K(dest_schema_version));
|
||||||
|
}
|
||||||
|
} else if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(
|
||||||
orig_tenant_id, src_tenant_schema_guard, orig_schema_version))) {
|
orig_tenant_id, src_tenant_schema_guard, orig_schema_version))) {
|
||||||
LOG_WARN("fail to get tenant schema guard", K(ret), K(orig_tenant_id), K(orig_schema_version));
|
LOG_WARN("fail to get tenant schema guard", K(ret), K(orig_tenant_id), K(orig_schema_version));
|
||||||
} else if (OB_FAIL(src_tenant_schema_guard.get_tenant_info(orig_tenant_id, tenant_schema))) {
|
} else if (OB_FAIL(src_tenant_schema_guard.get_tenant_info(orig_tenant_id, tenant_schema))) {
|
||||||
|
|||||||
Reference in New Issue
Block a user