support drop standby tenant
This commit is contained in:
		@ -186,6 +186,7 @@ int ObRecoveryLSService::process_ls_log_(const SCN &start_scn, PalfBufferIterato
 | 
			
		||||
  palf::LogEntry log_entry;
 | 
			
		||||
  palf::LSN target_lsn;
 | 
			
		||||
  SCN sync_scn;
 | 
			
		||||
  SCN last_sync_scn;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret), K(inited_));
 | 
			
		||||
@ -232,6 +233,15 @@ int ObRecoveryLSService::process_ls_log_(const SCN &start_scn, PalfBufferIterato
 | 
			
		||||
          LOG_WARN("failed to process ls tx log", KR(ret), K(tx_log_block), K(sync_scn));
 | 
			
		||||
        }
 | 
			
		||||
      } else {}
 | 
			
		||||
      if (OB_SUCC(ret)) {
 | 
			
		||||
        last_sync_scn = sync_scn;
 | 
			
		||||
      } else if (last_sync_scn.is_valid()) {
 | 
			
		||||
        //if ls_operator can not process, need to report last sync scn
 | 
			
		||||
        int tmp_ret = OB_SUCCESS;
 | 
			
		||||
        if (OB_TMP_FAIL(report_sys_ls_recovery_stat_(last_sync_scn))) {
 | 
			
		||||
          LOG_WARN("failed to report ls recovery stat", KR(ret), KR(tmp_ret), K(last_sync_scn));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }//end for each log
 | 
			
		||||
  if (OB_ITER_END == ret) {
 | 
			
		||||
@ -516,71 +526,28 @@ int ObRecoveryLSService::construct_ls_recovery_stat(const SCN &sync_scn,
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//TODO check other ls is valid to process ls operator
 | 
			
		||||
int ObRecoveryLSService::process_ls_operator_(const share::ObLSAttr &ls_attr, const SCN &sync_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObLSRecoveryStatOperator ls_recovery;
 | 
			
		||||
  common::ObMySQLTransaction trans;
 | 
			
		||||
  const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
 | 
			
		||||
  ObLSStatusOperator ls_operator;
 | 
			
		||||
  share::ObLSStatusInfo ls_status;
 | 
			
		||||
  ObLSLifeAgentManager ls_life_agent(*proxy_);
 | 
			
		||||
  if (OB_UNLIKELY(!ls_attr.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("ls attr is invalid", KR(ret), K(ls_attr));
 | 
			
		||||
  } else if (OB_ISNULL(proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("proxy is null", KR(ret));
 | 
			
		||||
  } else if (OB_FAIL(check_valid_to_operator_ls_(sync_scn))) {
 | 
			
		||||
    LOG_WARN("failed to check valid to operator ls", KR(ret), K(sync_scn));
 | 
			
		||||
  } else if (OB_FAIL(check_valid_to_operator_ls_(ls_attr, sync_scn))) {
 | 
			
		||||
    LOG_WARN("failed to check valid to operator ls", KR(ret), K(sync_scn), K(ls_attr));
 | 
			
		||||
  } else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
 | 
			
		||||
    LOG_WARN("failed to start trans", KR(ret), K(meta_tenant_id));
 | 
			
		||||
  } else if (share::is_ls_create_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
    //create new ls;
 | 
			
		||||
    if (OB_FAIL(create_new_ls_(ls_attr, sync_scn, trans))) {
 | 
			
		||||
      LOG_WARN("failed to create new ls", KR(ret), K(sync_scn), K(ls_attr));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (share::is_ls_create_abort_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
    if (OB_FAIL(ls_life_agent.drop_ls_in_trans(tenant_id_, ls_attr.get_ls_id(), trans))) {
 | 
			
		||||
      LOG_WARN("failed to drop ls", KR(ret), K(tenant_id_), K(ls_attr));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (share::is_ls_drop_end_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
    if (OB_FAIL(ls_life_agent.set_ls_offline_in_trans(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
            ls_attr.get_ls_status(), sync_scn, trans))) {
 | 
			
		||||
      LOG_WARN("failed to set offline", KR(ret), K(tenant_id_), K(ls_attr), K(sync_scn));
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    ObLSStatus target_status = share::OB_LS_EMPTY;
 | 
			
		||||
    if (OB_FAIL(ls_operator.get_ls_status_info(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
                                               ls_status, trans))) {
 | 
			
		||||
      LOG_WARN("failed to get ls status", KR(ret), K(tenant_id_), K(ls_attr));
 | 
			
		||||
    } else if (ls_status.ls_is_creating()) {
 | 
			
		||||
      ret = OB_EAGAIN;
 | 
			
		||||
      LOG_WARN("ls not created, need wait", KR(ret), K(ls_status));
 | 
			
		||||
    } else if (share::is_ls_create_end_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      // set ls to normal
 | 
			
		||||
      target_status = share::OB_LS_NORMAL;
 | 
			
		||||
    } else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      target_status = share::OB_LS_TENANT_DROPPING;
 | 
			
		||||
    } else if (share::is_ls_drop_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      target_status = share::OB_LS_DROPPING;
 | 
			
		||||
    } else if (share::is_ls_tenant_drop_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
       target_status = share::OB_LS_PRE_TENANT_DROPPING;
 | 
			
		||||
    } else {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("unexpected operation type", KR(ret), K(ls_attr));
 | 
			
		||||
    }
 | 
			
		||||
    if (FAILEDx(ls_operator.update_ls_status(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
                                             ls_status.status_, target_status,
 | 
			
		||||
                                             share::NORMAL_SWITCHOVER_STATUS, trans))) {
 | 
			
		||||
      LOG_WARN("failed to update ls status", KR(ret), K(tenant_id_), K(ls_attr),
 | 
			
		||||
               K(ls_status), K(target_status));
 | 
			
		||||
    }
 | 
			
		||||
    LOG_INFO("[LS_RECOVERY] update ls status", KR(ret), K(ls_attr), K(target_status));
 | 
			
		||||
  } else if (OB_FAIL(process_ls_operator_in_trans_(ls_attr, sync_scn, trans))) {
 | 
			
		||||
    LOG_WARN("failed to process ls operator in trans", KR(ret), K(ls_attr), K(sync_scn));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    ObLSRecoveryStat ls_recovery_stat;
 | 
			
		||||
    ObLSRecoveryStatOperator ls_recovery;
 | 
			
		||||
    if (OB_FAIL(construct_ls_recovery_stat(sync_scn, ls_recovery_stat))) {
 | 
			
		||||
      LOG_WARN("failed to construct ls recovery stat", KR(ret), K(sync_scn));
 | 
			
		||||
    } else if (OB_FAIL(ls_recovery.update_ls_recovery_stat_in_trans(
 | 
			
		||||
@ -598,7 +565,8 @@ int ObRecoveryLSService::process_ls_operator_(const share::ObLSAttr &ls_attr, co
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObRecoveryLSService::check_valid_to_operator_ls_(const SCN &sync_scn)
 | 
			
		||||
int ObRecoveryLSService::check_valid_to_operator_ls_(
 | 
			
		||||
    const share::ObLSAttr &ls_attr, const SCN &sync_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
@ -607,18 +575,24 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(const SCN &sync_scn)
 | 
			
		||||
  } else if (OB_ISNULL(proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("proxy is null", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(!sync_scn.is_valid())) {
 | 
			
		||||
  } else if (OB_UNLIKELY(!sync_scn.is_valid() || !ls_attr.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("syns scn is invalid", KR(ret), K(sync_scn));
 | 
			
		||||
    LOG_WARN("syns scn is invalid", KR(ret), K(sync_scn), K(ls_attr));
 | 
			
		||||
  } else if (share::is_ls_tenant_drop_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
    ret = OB_ITER_END;
 | 
			
		||||
    LOG_WARN("can not process ls operator after tenant dropping", K(ls_attr));
 | 
			
		||||
  } else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("ls recovery must stop while pre tenant dropping", KR(ret), K(ls_attr));
 | 
			
		||||
  } else {
 | 
			
		||||
    SCN user_scn;
 | 
			
		||||
    ObLSRecoveryStatOperator ls_recovery;
 | 
			
		||||
    if (OB_FAIL(ls_recovery.get_user_ls_sync_scn(tenant_id_, *proxy_, user_scn))) {
 | 
			
		||||
      LOG_WARN("failed to get user scn", KR(ret), K(tenant_id_));
 | 
			
		||||
    } else if (user_scn >= sync_scn) {
 | 
			
		||||
      //其他日志流越过系统日志流的同步点,可以处理日志流操作了
 | 
			
		||||
      //other ls has larger sync scn, ls can operator
 | 
			
		||||
    } else {
 | 
			
		||||
      ret = OB_EAGAIN;
 | 
			
		||||
      ret = OB_NEED_WAIT;
 | 
			
		||||
      LOG_WARN("can not process ls operator, need wait other ls sync", KR(ret),
 | 
			
		||||
          K(user_scn), K(sync_scn));
 | 
			
		||||
    }
 | 
			
		||||
@ -627,6 +601,71 @@ int ObRecoveryLSService::check_valid_to_operator_ls_(const SCN &sync_scn)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObRecoveryLSService::process_ls_operator_in_trans_(
 | 
			
		||||
    const share::ObLSAttr &ls_attr, const SCN &sync_scn, ObMySQLTransaction &trans)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id_);
 | 
			
		||||
  if (OB_UNLIKELY(!ls_attr.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("ls attr is invalid", KR(ret), K(ls_attr));
 | 
			
		||||
  } else if (OB_ISNULL(proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("proxy is null", KR(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    ObLSStatusOperator ls_operator;
 | 
			
		||||
    share::ObLSStatusInfo ls_status;
 | 
			
		||||
    ObLSLifeAgentManager ls_life_agent(*proxy_);
 | 
			
		||||
    if (OB_UNLIKELY(!ls_attr.is_valid())) {
 | 
			
		||||
      ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
      LOG_WARN("ls attr is invalid", KR(ret), K(ls_attr));
 | 
			
		||||
    } else if (share::is_ls_create_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      //create new ls;
 | 
			
		||||
      if (OB_FAIL(create_new_ls_(ls_attr, sync_scn, trans))) {
 | 
			
		||||
        LOG_WARN("failed to create new ls", KR(ret), K(sync_scn), K(ls_attr));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (share::is_ls_create_abort_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      if (OB_FAIL(ls_life_agent.drop_ls_in_trans(tenant_id_, ls_attr.get_ls_id(), trans))) {
 | 
			
		||||
        LOG_WARN("failed to drop ls", KR(ret), K(tenant_id_), K(ls_attr));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (share::is_ls_drop_end_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
      if (OB_FAIL(ls_life_agent.set_ls_offline_in_trans(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
              ls_attr.get_ls_status(), sync_scn, trans))) {
 | 
			
		||||
        LOG_WARN("failed to set offline", KR(ret), K(tenant_id_), K(ls_attr), K(sync_scn));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      ObLSStatus target_status = share::OB_LS_EMPTY;
 | 
			
		||||
      if (OB_FAIL(ls_operator.get_ls_status_info(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
              ls_status, trans))) {
 | 
			
		||||
        LOG_WARN("failed to get ls status", KR(ret), K(tenant_id_), K(ls_attr));
 | 
			
		||||
      } else if (ls_status.ls_is_creating()) {
 | 
			
		||||
        ret = OB_EAGAIN;
 | 
			
		||||
        LOG_WARN("ls not created, need wait", KR(ret), K(ls_status));
 | 
			
		||||
      } else if (share::is_ls_create_end_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
        // set ls to normal
 | 
			
		||||
        target_status = share::OB_LS_NORMAL;
 | 
			
		||||
      } else if (share::is_ls_tenant_drop_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
        target_status = share::OB_LS_TENANT_DROPPING;
 | 
			
		||||
      } else if (share::is_ls_drop_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
        target_status = share::OB_LS_DROPPING;
 | 
			
		||||
      } else if (share::is_ls_tenant_drop_pre_op(ls_attr.get_ls_operatin_type())) {
 | 
			
		||||
        target_status = share::OB_LS_PRE_TENANT_DROPPING;
 | 
			
		||||
      } else {
 | 
			
		||||
        ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
        LOG_WARN("unexpected operation type", KR(ret), K(ls_attr));
 | 
			
		||||
      }
 | 
			
		||||
      if (FAILEDx(ls_operator.update_ls_status(tenant_id_, ls_attr.get_ls_id(),
 | 
			
		||||
              ls_status.status_, target_status,
 | 
			
		||||
              share::NORMAL_SWITCHOVER_STATUS, trans))) {
 | 
			
		||||
        LOG_WARN("failed to update ls status", KR(ret), K(tenant_id_), K(ls_attr),
 | 
			
		||||
            K(ls_status), K(target_status));
 | 
			
		||||
      }
 | 
			
		||||
      LOG_INFO("[LS_RECOVERY] update ls status", KR(ret), K(ls_attr), K(target_status));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
int ObRecoveryLSService::create_new_ls_(const share::ObLSAttr &ls_attr,
 | 
			
		||||
                                        const SCN &sync_scn,
 | 
			
		||||
                                        common::ObMySQLTransaction &trans)
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user