fix drop tenant maybe hung
This commit is contained in:
@ -75,7 +75,7 @@ void ObPrimaryLSService::do_work()
|
|||||||
while (!has_set_stop()) {
|
while (!has_set_stop()) {
|
||||||
tenant_schema.reset();
|
tenant_schema.reset();
|
||||||
ObCurTraceId::init(GCONF.self_addr_);
|
ObCurTraceId::init(GCONF.self_addr_);
|
||||||
|
DEBUG_SYNC(STOP_PRIMARY_LS_THREAD);
|
||||||
if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
|
if (OB_FAIL(get_tenant_schema(tenant_id_, tenant_schema))) {
|
||||||
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
|
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id_));
|
||||||
} else {
|
} else {
|
||||||
|
@ -67,55 +67,6 @@ void ObTenantBalanceService::destroy()
|
|||||||
inited_ = false;
|
inited_ = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantBalanceService::wait_tenant_and_version_ready_()
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
if (OB_UNLIKELY(!inited_)) {
|
|
||||||
ret = OB_NOT_INIT;
|
|
||||||
LOG_WARN("not init", KR(ret));
|
|
||||||
} else if (OB_ISNULL(GCTX.schema_service_)) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("schema ptr is null", KR(ret), KP(GCTX.schema_service_));
|
|
||||||
} else {
|
|
||||||
bool is_ready = false;
|
|
||||||
uint64_t tenant_data_version = 0;
|
|
||||||
while (!is_ready && !has_set_stop()) {
|
|
||||||
ret = OB_SUCCESS;
|
|
||||||
share::schema::ObSchemaGetterGuard schema_guard;
|
|
||||||
const share::schema::ObTenantSchema *tenant_schema = NULL;
|
|
||||||
|
|
||||||
if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tenant_data_version))) {
|
|
||||||
LOG_WARN("failed to get min data version", KR(ret), K(tenant_id_));
|
|
||||||
} else if (tenant_data_version < DATA_VERSION_4_2_0_0) {
|
|
||||||
ret = OB_NEED_WAIT;
|
|
||||||
WSTAT("tenant version not target, need wait", KR(ret), K(tenant_data_version));
|
|
||||||
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(OB_SYS_TENANT_ID, schema_guard))) {
|
|
||||||
LOG_WARN("fail to get schema guard", KR(ret));
|
|
||||||
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) {
|
|
||||||
LOG_WARN("failed to get tenant ids", KR(ret), K(tenant_id_));
|
|
||||||
} else if (OB_ISNULL(tenant_schema)) {
|
|
||||||
ret = OB_TENANT_NOT_EXIST;
|
|
||||||
LOG_WARN("tenant not exist", KR(ret), K(tenant_id_));
|
|
||||||
} else if (!tenant_schema->is_normal()) {
|
|
||||||
ret = OB_NEED_WAIT;
|
|
||||||
WSTAT("tenant schema not ready, no need tenant balance", KR(ret), KPC(tenant_schema));
|
|
||||||
} else {
|
|
||||||
is_ready = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (! is_ready) {
|
|
||||||
idle(10 * 1000 *1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (has_set_stop()) {
|
|
||||||
WSTAT("thread has been stopped", K(is_ready), K(tenant_id_));
|
|
||||||
ret = OB_IN_STOP_STATE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ObTenantBalanceService::balance_primary_zone_()
|
int ObTenantBalanceService::balance_primary_zone_()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
@ -97,7 +97,6 @@ private:
|
|||||||
int persist_job_and_task_(const share::ObBalanceJob &job,
|
int persist_job_and_task_(const share::ObBalanceJob &job,
|
||||||
ObArray<share::ObBalanceTask> &tasks);
|
ObArray<share::ObBalanceTask> &tasks);
|
||||||
int construct_dependency_of_each_task_(ObArray<share::ObBalanceTask> &tasks);
|
int construct_dependency_of_each_task_(ObArray<share::ObBalanceTask> &tasks);
|
||||||
int wait_tenant_and_version_ready_();
|
|
||||||
int lock_and_check_balance_job_(common::ObMySQLTransaction &trans, const uint64_t tenant_id);
|
int lock_and_check_balance_job_(common::ObMySQLTransaction &trans, const uint64_t tenant_id);
|
||||||
int balance_primary_zone_();
|
int balance_primary_zone_();
|
||||||
int try_update_job_comment_(const share::ObBalanceJob &job, const common::ObSqlString &comment);
|
int try_update_job_comment_(const share::ObBalanceJob &job, const common::ObSqlString &comment);
|
||||||
|
@ -209,7 +209,7 @@ int ObTenantThreadHelper::wait_tenant_schema_and_version_ready_(
|
|||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(get_tenant_schema(tenant_id, tenant_schema))) {
|
if (OB_FAIL(get_tenant_schema(tenant_id, tenant_schema))) {
|
||||||
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id));
|
LOG_WARN("failed to get tenant schema", KR(ret), K(tenant_id));
|
||||||
} else if (!tenant_schema.is_normal()) {
|
} else if (tenant_schema.is_creating()) {
|
||||||
ret = OB_NEED_WAIT;
|
ret = OB_NEED_WAIT;
|
||||||
LOG_WARN("tenant schema not ready, no need tenant balance", KR(ret), K(tenant_schema));
|
LOG_WARN("tenant schema not ready, no need tenant balance", KR(ret), K(tenant_schema));
|
||||||
} else {
|
} else {
|
||||||
|
@ -545,6 +545,7 @@ class ObString;
|
|||||||
ACT(BEFORE_BUILD_LS_MIGRATION_DAG_NET,)\
|
ACT(BEFORE_BUILD_LS_MIGRATION_DAG_NET,)\
|
||||||
ACT(AFTER_JOIN_LEARNER_LIST,)\
|
ACT(AFTER_JOIN_LEARNER_LIST,)\
|
||||||
ACT(BEFORE_TRANSFER_START_COMMIT,)\
|
ACT(BEFORE_TRANSFER_START_COMMIT,)\
|
||||||
|
ACT(STOP_PRIMARY_LS_THREAD,)\
|
||||||
ACT(MAX_DEBUG_SYNC_POINT,)
|
ACT(MAX_DEBUG_SYNC_POINT,)
|
||||||
|
|
||||||
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);
|
||||||
|
Reference in New Issue
Block a user