gather readble scn in memory
This commit is contained in:
		@ -1396,5 +1396,113 @@ int ObTenantLSInfo::get_next_primary_zone(
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
int ObLSServiceHelper::check_transfer_task_replay(const uint64_t tenant_id,
 | 
			
		||||
      const share::ObLSID &src_ls,
 | 
			
		||||
      const share::ObLSID &dest_ls,
 | 
			
		||||
      const share::SCN &transfer_scn,
 | 
			
		||||
      bool &replay_finish)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObLSStatusOperator ls_operator;
 | 
			
		||||
  share::ObLSStatusInfo ls_status;
 | 
			
		||||
  SCN readble_scn;
 | 
			
		||||
  replay_finish = true;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
 | 
			
		||||
        || !src_ls.is_valid() || !dest_ls.is_valid()
 | 
			
		||||
        || !transfer_scn.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(src_ls),
 | 
			
		||||
        K(dest_ls), K(transfer_scn));
 | 
			
		||||
  } else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) {
 | 
			
		||||
    LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(src_ls), K(transfer_scn));
 | 
			
		||||
  } else if (!replay_finish) {
 | 
			
		||||
    LOG_WARN("src ls has not replay transfer finish", K(tenant_id), K(src_ls));
 | 
			
		||||
  } else if (OB_FAIL(check_ls_transfer_replay_(tenant_id, src_ls, transfer_scn, replay_finish))) {
 | 
			
		||||
    LOG_WARN("failed to check ls transfer replay", KR(ret), K(tenant_id), K(dest_ls), K(transfer_scn));
 | 
			
		||||
  } else if (!replay_finish) {
 | 
			
		||||
    LOG_WARN("dest ls has not replay transfer finish", K(tenant_id), K(dest_ls));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObLSServiceHelper::check_ls_transfer_replay_(const uint64_t tenant_id,
 | 
			
		||||
      const share::ObLSID &ls_id,
 | 
			
		||||
      const share::SCN &transfer_scn,
 | 
			
		||||
      bool &replay_finish)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObLSStatusOperator ls_operator;
 | 
			
		||||
  share::ObLSStatusInfo ls_status;
 | 
			
		||||
  SCN readable_scn;
 | 
			
		||||
  replay_finish = true;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
 | 
			
		||||
        || !ls_id.is_valid()
 | 
			
		||||
        || !transfer_scn.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(ls_id), K(transfer_scn));
 | 
			
		||||
  } else if (OB_ISNULL(GCTX.sql_proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("ptr is null", KR(ret), KP(GCTX.sql_proxy_));
 | 
			
		||||
  } else if (OB_FAIL(ls_operator.get_ls_status_info(tenant_id, ls_id,
 | 
			
		||||
          ls_status, *GCTX.sql_proxy_))) {
 | 
			
		||||
    if (OB_ENTRY_NOT_EXIST == ret) {
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
      LOG_INFO("src ls not exist, no need check", K(tenant_id), K(ls_id));
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_WARN("failed to get ls status info", KR(ret), K(tenant_id), K(ls_id));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(get_ls_all_replica_readable_scn_(tenant_id, ls_id, readable_scn))) {
 | 
			
		||||
    LOG_WARN("failed to get ls all replica readable scn", KR(ret), K(tenant_id), K(ls_id));
 | 
			
		||||
  } else if (readable_scn < transfer_scn) {
 | 
			
		||||
    replay_finish = false;
 | 
			
		||||
    LOG_INFO("need wait, ls has not replay finish transfer", K(tenant_id),
 | 
			
		||||
        K(ls_id), K(readable_scn), K(transfer_scn));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObLSServiceHelper::get_ls_all_replica_readable_scn_(const uint64_t tenant_id,
 | 
			
		||||
      const share::ObLSID &ls_id,
 | 
			
		||||
      share::SCN &readable_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObTimeoutCtx ctx;
 | 
			
		||||
  if (OB_UNLIKELY(!ls_id.is_valid() || !is_valid_tenant_id(tenant_id))) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(ls_id), K(tenant_id));
 | 
			
		||||
  } else if (OB_ISNULL(GCTX.location_service_) || OB_ISNULL(GCTX.srv_rpc_proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("location service is null", KR(ret), KP(GCTX.location_service_),
 | 
			
		||||
        KP(GCTX.srv_rpc_proxy_));
 | 
			
		||||
  } else if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
 | 
			
		||||
    LOG_WARN("fail to set timeout ctx", KR(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    obrpc::ObGetLSReplayedScnArg arg;
 | 
			
		||||
    ObAddr leader;
 | 
			
		||||
    ObGetLSReplayedScnRes result;
 | 
			
		||||
    const int64_t timeout = ctx.get_timeout();
 | 
			
		||||
    ObGetLSReplayedScnProxy proxy(
 | 
			
		||||
        *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_replayed_scn);
 | 
			
		||||
    if (OB_FAIL(arg.init(tenant_id, ls_id, true))) {
 | 
			
		||||
      LOG_WARN("failed to init arg", KR(ret), K(tenant_id), K(ls_id));
 | 
			
		||||
    } else if (OB_FAIL(GCTX.location_service_->get_leader(
 | 
			
		||||
            GCONF.cluster_id, tenant_id, ls_id, false, leader))) {
 | 
			
		||||
      LOG_WARN("failed to get leader", KR(ret), K(tenant_id), K(ls_id));
 | 
			
		||||
    } else if (OB_FAIL(proxy.call(leader, timeout, tenant_id, arg))) {
 | 
			
		||||
      LOG_WARN("failed to get ls repalyed scn", KR(ret), K(leader), K(tenant_id),
 | 
			
		||||
          K(timeout), K(arg));
 | 
			
		||||
    } else if (OB_FAIL(proxy.wait())) {
 | 
			
		||||
      LOG_WARN("failed to get wait all rpc", KR(ret), K(tenant_id), K(ls_id), K(leader));
 | 
			
		||||
    } else if (OB_ISNULL(proxy.get_results().at(0))) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("result is null", KR(ret), K(tenant_id), K(leader), K(ls_id));
 | 
			
		||||
    } else {
 | 
			
		||||
      readable_scn = proxy.get_results().at(0)->get_cur_readable_scn();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}//end of rootserver
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user