fix transfer not check dest ls
This commit is contained in:
@ -158,6 +158,8 @@ int ObLSRecoveryStatHandler::get_all_replica_min_readable_scn(share::SCN &readab
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
//TODO maybe need consider readable scn in inner table
|
//TODO maybe need consider readable scn in inner table
|
||||||
|
ObLSID ls_id = ls_->get_ls_id();
|
||||||
|
LOG_INFO("all ls readable scn", K(ls_id), K(readable_scn), K(replicas_scn_));
|
||||||
}
|
}
|
||||||
if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) {
|
if (FAILEDx(get_latest_palf_stat_(palf_stat_second))) {
|
||||||
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
|
LOG_WARN("get latest palf_stat failed", KR(ret), KPC_(ls));
|
||||||
@ -487,8 +489,8 @@ int ObLSRecoveryStatHandler::gather_replica_readable_scn()
|
|||||||
if (OB_FAIL(replicas_scn_.assign(replicas_scn))) {
|
if (OB_FAIL(replicas_scn_.assign(replicas_scn))) {
|
||||||
LOG_WARN("failed to replicas scn", KR(ret), K(replicas_scn));
|
LOG_WARN("failed to replicas scn", KR(ret), K(replicas_scn));
|
||||||
}
|
}
|
||||||
const int64_t PRINT_INTERVAL = 10 * 1000 * 1000L;
|
const int64_t PRINT_INTERVAL = 1 * 1000 * 1000L;
|
||||||
if (REACH_TIME_INTERVAL(PRINT_INTERVAL)) {
|
if (REACH_TENANT_TIME_INTERVAL(PRINT_INTERVAL)) {
|
||||||
LOG_INFO("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
|
LOG_INFO("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
|
||||||
} else {
|
} else {
|
||||||
LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
|
LOG_TRACE("ls readable scn in memory", KR(ret), K(ls_id), K(replicas_scn_));
|
||||||
|
|||||||
@ -1542,7 +1542,7 @@ int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id,
|
|||||||
LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn));
|
LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn));
|
||||||
} else if (!replay_finish) {
|
} else if (!replay_finish) {
|
||||||
LOG_WARN("src ls has not replay transfer finish", K(tenant_id), K(src_ls));
|
LOG_WARN("src ls has not replay transfer finish", K(tenant_id), K(src_ls));
|
||||||
} else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) {
|
} else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, dest_ls, transfer_scn, replay_finish))) {
|
||||||
LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(dest_ls), K(transfer_scn));
|
LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(dest_ls), K(transfer_scn));
|
||||||
} else if (!replay_finish) {
|
} else if (!replay_finish) {
|
||||||
LOG_WARN("dest ls has not replay transfer finish", K(tenant_id), K(dest_ls));
|
LOG_WARN("dest ls has not replay transfer finish", K(tenant_id), K(dest_ls));
|
||||||
@ -1623,6 +1623,7 @@ int ObLSServiceHelper::get_ls_all_replica_readable_scn_(const uint64_t tenant_id
|
|||||||
LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id));
|
LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id));
|
||||||
} else {
|
} else {
|
||||||
readable_scn = proxy.get_results().at(0)->get_cur_readable_scn();
|
readable_scn = proxy.get_results().at(0)->get_cur_readable_scn();
|
||||||
|
LOG_INFO("get all replica readable scn", K(ls_id), K(readable_scn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -1228,6 +1228,12 @@ int ObRecoveryLSService::try_do_ls_balance_task_(
|
|||||||
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||||
} else {
|
} else {
|
||||||
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
||||||
|
ROOTSERVICE_EVENT_ADD("standby_tenant", "remove_balance_task",
|
||||||
|
K_(tenant_id), "task_type", ls_balance_task.get_task_op(),
|
||||||
|
"task_scn", ls_balance_task.get_operation_scn(),
|
||||||
|
"switchover_status", tenant_info.get_switchover_status(),
|
||||||
|
"src_ls", ls_balance_task.get_src_ls(),
|
||||||
|
"dest_ls", ls_balance_task.get_dest_ls());
|
||||||
}
|
}
|
||||||
END_TRANSACTION(trans)
|
END_TRANSACTION(trans)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -575,6 +575,7 @@ void ObAllTenantInfoCache::reset()
|
|||||||
ora_rowscn_ = 0;
|
ora_rowscn_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_UPDATE_TENANT_INFO_CACHE_ERROR);
|
||||||
int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
|
int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
|
||||||
common::ObMySQLProxy *sql_proxy,
|
common::ObMySQLProxy *sql_proxy,
|
||||||
bool &content_changed)
|
bool &content_changed)
|
||||||
@ -602,7 +603,9 @@ int ObAllTenantInfoCache::refresh_tenant_info(const uint64_t tenant_id,
|
|||||||
* This also ensures the consistency of tenant_role cache and the tenant role field in all_tenant_info
|
* This also ensures the consistency of tenant_role cache and the tenant role field in all_tenant_info
|
||||||
*/
|
*/
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
if (ora_rowscn >= ora_rowscn_) {
|
if (OB_UNLIKELY(ERRSIM_UPDATE_TENANT_INFO_CACHE_ERROR)) {
|
||||||
|
ret = ERRSIM_UPDATE_TENANT_INFO_CACHE_ERROR;
|
||||||
|
} else if (ora_rowscn >= ora_rowscn_) {
|
||||||
if (ora_rowscn > ora_rowscn_) {
|
if (ora_rowscn > ora_rowscn_) {
|
||||||
MTL_SET_TENANT_ROLE_CACHE(new_tenant_info.get_tenant_role().value());
|
MTL_SET_TENANT_ROLE_CACHE(new_tenant_info.get_tenant_role().value());
|
||||||
(void)tenant_info_.assign(new_tenant_info);
|
(void)tenant_info_.assign(new_tenant_info);
|
||||||
@ -637,6 +640,8 @@ int ObAllTenantInfoCache::update_tenant_info_cache(
|
|||||||
if (!new_tenant_info.is_valid() || 0 == new_ora_rowscn || INT64_MAX == new_ora_rowscn) {
|
if (!new_tenant_info.is_valid() || 0 == new_ora_rowscn || INT64_MAX == new_ora_rowscn) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(new_tenant_info), K(new_ora_rowscn));
|
LOG_WARN("invalid argument", KR(ret), K(new_tenant_info), K(new_ora_rowscn));
|
||||||
|
} else if (OB_UNLIKELY(ERRSIM_UPDATE_TENANT_INFO_CACHE_ERROR)) {
|
||||||
|
ret = ERRSIM_UPDATE_TENANT_INFO_CACHE_ERROR;
|
||||||
} else {
|
} else {
|
||||||
SpinWLockGuard guard(lock_);
|
SpinWLockGuard guard(lock_);
|
||||||
if (!tenant_info_.is_valid() || 0 == ora_rowscn_) {
|
if (!tenant_info_.is_valid() || 0 == ora_rowscn_) {
|
||||||
|
|||||||
@ -1342,7 +1342,11 @@ int ObTenantRoleTransitionService::get_checkpoints_by_rpc(const uint64_t tenant_
|
|||||||
ObGetLSSyncScnProxy proxy(
|
ObGetLSSyncScnProxy proxy(
|
||||||
*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_sync_scn);
|
*GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_sync_scn);
|
||||||
obrpc::ObGetLSSyncScnArg arg;
|
obrpc::ObGetLSSyncScnArg arg;
|
||||||
const uint64_t group_id = share::OBCG_DBA_COMMAND;
|
//由于在check_sync_to_latest,需要给上游发RPC或者SQL获取准确的end_scn,所以会存在嵌套
|
||||||
|
//RPC的概率,OBCG_DBA_COMMAND这个队列是需要的时候创建,个数和租户的CPU相关,如果发生
|
||||||
|
//嵌套RPC的话,可能会出现资源型饿死的可能性。
|
||||||
|
//在不需要检查check_sync_to_latest使用OBCG_DBA_COMMAND,否则为了避免嵌套RPC,使用NORMAL队列
|
||||||
|
const uint64_t group_id = check_sync_to_latest ? 0 : share::OBCG_DBA_COMMAND;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) {
|
||||||
const ObLSStatusInfo &info = status_info_array.at(i);
|
const ObLSStatusInfo &info = status_info_array.at(i);
|
||||||
const int64_t timeout_us = !THIS_WORKER.is_timeout_ts_valid() ?
|
const int64_t timeout_us = !THIS_WORKER.is_timeout_ts_valid() ?
|
||||||
|
|||||||
Reference in New Issue
Block a user