[CP] create ls check offline_scn
This commit is contained in:
		@ -13,6 +13,7 @@
 | 
			
		||||
#define USING_LOG_PREFIX BALANCE
 | 
			
		||||
#include "ob_balance_task_execute_service.h"
 | 
			
		||||
#include "lib/mysqlclient/ob_mysql_transaction.h"//trans
 | 
			
		||||
#include "lib/utility/ob_tracepoint.h" // ERRSIM_POINT_DEF
 | 
			
		||||
#include "share/schema/ob_schema_struct.h"//ObTenantInfo
 | 
			
		||||
#include "share/schema/ob_multi_version_schema_service.h"//ObMultiVersionSchemaService
 | 
			
		||||
#include "share/schema/ob_part_mgr_util.h"//ObPartitionSchemaIter
 | 
			
		||||
@ -23,6 +24,7 @@
 | 
			
		||||
#include "share/ls/ob_ls_operator.h"//ls_op
 | 
			
		||||
#include "share/ls/ob_ls_status_operator.h"//status_op
 | 
			
		||||
#include "share/ls/ob_ls_table_operator.h"//lst_operator->get
 | 
			
		||||
#include "share/rpc/ob_async_rpc_proxy.h"//wait_all
 | 
			
		||||
#include "rootserver/ob_tenant_transfer_service.h"//transfer
 | 
			
		||||
#include "rootserver/balance/ob_ls_all_part_builder.h"   // ObLSAllPartBuilder
 | 
			
		||||
#include "rootserver/ob_root_utils.h"//get_rs_default_timeout_ctx
 | 
			
		||||
@ -265,8 +267,8 @@ int ObBalanceTaskExecuteService::process_current_task_status_(
 | 
			
		||||
  } else {
 | 
			
		||||
    if (task.get_task_status().is_init()) {
 | 
			
		||||
      DEBUG_SYNC(BEFORE_PROCESS_BALANCE_TASK_INIT);
 | 
			
		||||
      if (OB_FAIL(process_init_task_(task, trans))) {
 | 
			
		||||
        LOG_WARN("failed to init trans", KR(ret));
 | 
			
		||||
      if (OB_FAIL(process_init_task_(task, trans, skip_next_status))) {
 | 
			
		||||
        LOG_WARN("failed to init trans", KR(ret), K(task));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (task.get_task_status().is_create_ls()) {
 | 
			
		||||
      DEBUG_SYNC(BEFORE_PROCESS_BALANCE_TASK_CREATE_LS);
 | 
			
		||||
@ -544,9 +546,11 @@ int ObBalanceTaskExecuteService::cancel_other_init_task_(
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskExecuteService::process_init_task_(const ObBalanceTask &task,
 | 
			
		||||
                                                    ObMySQLTransaction &trans)
 | 
			
		||||
                                                    ObMySQLTransaction &trans,
 | 
			
		||||
                                                    bool &skip_next_status)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  skip_next_status = false;
 | 
			
		||||
  ObLSAttrOperator ls_op(tenant_id_, sql_proxy_);
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
@ -559,11 +563,18 @@ int ObBalanceTaskExecuteService::process_init_task_(const ObBalanceTask &task,
 | 
			
		||||
    share::ObLSAttr ls_info;
 | 
			
		||||
    share::ObLSFlag flag;
 | 
			
		||||
    SCN create_scn;
 | 
			
		||||
    if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
 | 
			
		||||
      LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
 | 
			
		||||
    if (OB_FAIL(wait_can_create_new_ls_(create_scn))) {
 | 
			
		||||
      LOG_WARN("failed to wait create new ls", KR(ret), K(tenant_id_));
 | 
			
		||||
      if (OB_NEED_WAIT == ret && !task_comment_.empty()) {
 | 
			
		||||
        //为了保证task_comment可以更新到表里,先重置错误码
 | 
			
		||||
        //但是要跳过日志流的创建,设置skip_next_status等于true
 | 
			
		||||
        ret = OB_SUCCESS;
 | 
			
		||||
        skip_next_status = true;
 | 
			
		||||
      }
 | 
			
		||||
    } else if (OB_FAIL(ls_info.init(task.get_dest_ls_id(), task.get_ls_group_id(), flag,
 | 
			
		||||
                             share::OB_LS_CREATING, share::OB_LS_OP_CREATE_PRE, create_scn))) {
 | 
			
		||||
      LOG_WARN("failed to init new operation", KR(ret), K(create_scn), K(task));
 | 
			
		||||
      LOG_WARN("failed to init new operation", KR(ret), K(create_scn), K(task),
 | 
			
		||||
          K(skip_next_status), K(task_comment_));
 | 
			
		||||
      //TODO msy164651
 | 
			
		||||
    } else if (OB_FAIL(ls_op.insert_ls(ls_info, share::NORMAL_SWITCHOVER_STATUS, &trans))) {
 | 
			
		||||
      LOG_WARN("failed to insert new operation", KR(ret), K(ls_info));
 | 
			
		||||
@ -856,6 +867,200 @@ int ObBalanceTaskExecuteService::set_ls_to_dropping_(const ObLSID &ls_id, ObMySQ
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
//在一个merge_ls在执行的时候,都保证日志流一定会推到wait_offline状态。
 | 
			
		||||
//这里的目的不是检查现有的资源是否足够创建日志流,只是为了保证
 | 
			
		||||
//这一轮job新创建的日志流的create_scn一定会大于上一轮job产生的wait_offline日志流的offline_scn。
 | 
			
		||||
ERRSIM_POINT_DEF(EN_SET_MAX_OFFLINE_SCN);
 | 
			
		||||
int ObBalanceTaskExecuteService::wait_can_create_new_ls_(share::SCN &create_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  create_scn.reset();
 | 
			
		||||
  share::SCN offline_scn;
 | 
			
		||||
  int64_t offline_ls_count = 0;
 | 
			
		||||
  uint64_t cluster_version = GET_MIN_CLUSTER_VERSION();
 | 
			
		||||
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret));
 | 
			
		||||
  } else if (cluster_version < CLUSTER_VERSION_4_2_2_0) {
 | 
			
		||||
    //版本号没有推到4220版本,获取不到offline_scn
 | 
			
		||||
    if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
 | 
			
		||||
      LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(get_max_offline_scn_(offline_scn, offline_ls_count))) {
 | 
			
		||||
    LOG_WARN("failed to get max offline scn", KR(ret));
 | 
			
		||||
  } else if (0 == offline_ls_count) {
 | 
			
		||||
    if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
 | 
			
		||||
      LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_UNLIKELY(!offline_scn.is_valid() || 0 > offline_ls_count)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("offline scn is invalid", KR(ret), K(offline_scn), K(offline_ls_count));
 | 
			
		||||
  } else {
 | 
			
		||||
    const int64_t start_time = ObTimeUtility::fast_current_time();
 | 
			
		||||
    const int64_t TIMEOUT = GCONF.rpc_timeout;
 | 
			
		||||
    //for test
 | 
			
		||||
    if (EN_SET_MAX_OFFLINE_SCN) {
 | 
			
		||||
      LOG_INFO("set offline scn to max", K(offline_scn));
 | 
			
		||||
      offline_scn.set_max();
 | 
			
		||||
    }
 | 
			
		||||
    do {
 | 
			
		||||
      if (ObTimeUtility::fast_current_time() - start_time > TIMEOUT) {
 | 
			
		||||
        ret = OB_NEED_WAIT;
 | 
			
		||||
        LOG_WARN("stmt is timeout", KR(ret), K(start_time), K(TIMEOUT),
 | 
			
		||||
            K(create_scn), K(offline_scn));
 | 
			
		||||
        int tmp_ret = OB_SUCCESS;
 | 
			
		||||
        if (OB_TMP_FAIL(task_comment_.assign(
 | 
			
		||||
                "Wait timed out for GTS to exceed max offline scn for all LS"))) {
 | 
			
		||||
          LOG_WARN("failed to assign task comment", KR(tmp_ret), K(offline_scn));
 | 
			
		||||
        }
 | 
			
		||||
      } else if (OB_FAIL(ObLSAttrOperator::get_tenant_gts(tenant_id_, create_scn))) {
 | 
			
		||||
        LOG_WARN("failed to get tenant gts", KR(ret), K(tenant_id_));
 | 
			
		||||
      } else if (create_scn > offline_scn) {
 | 
			
		||||
        ret = OB_SUCCESS;
 | 
			
		||||
      } else {
 | 
			
		||||
        ret = OB_EAGAIN;
 | 
			
		||||
        LOG_WARN("create scn is smaller than offline scn, need wait", KR(ret),
 | 
			
		||||
            K(create_scn), K(offline_scn), K(offline_ls_count));
 | 
			
		||||
        // waiting 100ms
 | 
			
		||||
        ob_usleep(100L * 1000L);
 | 
			
		||||
      }
 | 
			
		||||
    } while (OB_EAGAIN == ret);
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskExecuteService::get_max_offline_scn_(share::SCN &offline_scn, int64_t &offline_ls_count)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  offline_scn.reset();
 | 
			
		||||
  offline_ls_count = 0;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret));
 | 
			
		||||
  } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)
 | 
			
		||||
      || OB_ISNULL(GCTX.location_service_)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_), KP(GCTX.srv_rpc_proxy_),
 | 
			
		||||
        KP(GCTX.location_service_));
 | 
			
		||||
  } else {
 | 
			
		||||
   ObGetLSReplayedScnProxy proxy(
 | 
			
		||||
        *GCTX.srv_rpc_proxy_, &obrpc::ObSrvRpcProxy::get_ls_replayed_scn);
 | 
			
		||||
    ObArray<int> return_code_array;
 | 
			
		||||
    if (OB_FAIL(get_ls_offline_scn_by_rpc_(proxy, offline_ls_count, return_code_array))) {
 | 
			
		||||
      LOG_WARN("failed to get ls offline scn", KR(ret));
 | 
			
		||||
    } else if (0 == offline_ls_count) {
 | 
			
		||||
      //nothing todo
 | 
			
		||||
    } else if (return_code_array.count() != offline_ls_count) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("offline count not equal to return code array", KR(ret),
 | 
			
		||||
          K(offline_ls_count), K(return_code_array));
 | 
			
		||||
    } else if (OB_FAIL(proxy.check_return_cnt(return_code_array.count()))) {
 | 
			
		||||
      LOG_WARN("fail to check return cnt", KR(ret),
 | 
			
		||||
          "return_cnt", return_code_array.count());
 | 
			
		||||
    } else {
 | 
			
		||||
      offline_scn.set_min();
 | 
			
		||||
      for (int64_t i = 0; OB_SUCC(ret) && i < return_code_array.count(); ++i) {
 | 
			
		||||
        if (OB_FAIL(return_code_array.at(i))) {
 | 
			
		||||
          LOG_WARN("send rpc is failed", KR(ret), K(i));
 | 
			
		||||
        } else {
 | 
			
		||||
          const obrpc::ObGetLSReplayedScnRes *result = proxy.get_results().at(i);
 | 
			
		||||
          if (OB_ISNULL(result)) {
 | 
			
		||||
            ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
            LOG_WARN("result is null", KR(ret), K(i));
 | 
			
		||||
          } else if (!result->get_offline_scn().is_valid()) {
 | 
			
		||||
            ret = OB_NEED_WAIT;
 | 
			
		||||
            LOG_WARN("offline scn is invalid", KR(ret), KPC(result));
 | 
			
		||||
            int tmp_ret = OB_SUCCESS;
 | 
			
		||||
            if (OB_TMP_FAIL(task_comment_.assign_fmt("Wait for LS %ld to write offline log",
 | 
			
		||||
                    result->get_ls_id().id()))) {
 | 
			
		||||
              LOG_WARN("failed to assign task comment", KR(tmp_ret), KPC(result));
 | 
			
		||||
            }
 | 
			
		||||
          } else if (result->get_offline_scn() > offline_scn) {
 | 
			
		||||
            offline_scn = result->get_offline_scn();
 | 
			
		||||
            LOG_INFO("get offline scn", K(offline_scn), KPC(result));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }//end for
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
int ObBalanceTaskExecuteService::get_ls_offline_scn_by_rpc_(
 | 
			
		||||
    ObGetLSReplayedScnProxy &proxy,
 | 
			
		||||
    int64_t &offline_ls_count,
 | 
			
		||||
    ObIArray<int> &return_code_array)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  offline_ls_count = 0;
 | 
			
		||||
  ObArray<ObLSStatusInfo> status_info_array;
 | 
			
		||||
  ObLSStatusOperator ls_status_op;
 | 
			
		||||
  int tmp_ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret));
 | 
			
		||||
  } else if (OB_ISNULL(sql_proxy_) || OB_ISNULL(GCTX.srv_rpc_proxy_)
 | 
			
		||||
      || OB_ISNULL(GCTX.location_service_)) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), KP(sql_proxy_), KP(GCTX.srv_rpc_proxy_),
 | 
			
		||||
        KP(GCTX.location_service_));
 | 
			
		||||
  } else if (OB_FAIL(ls_status_op.get_all_ls_status_by_order(tenant_id_,
 | 
			
		||||
          status_info_array, *sql_proxy_))) {
 | 
			
		||||
    LOG_WARN("failed to get ls status info array", KR(ret), K(tenant_id_));
 | 
			
		||||
  } else {
 | 
			
		||||
    obrpc::ObGetLSReplayedScnArg arg;
 | 
			
		||||
    ObAddr leader;
 | 
			
		||||
    ObTimeoutCtx ctx;
 | 
			
		||||
    if (OB_FAIL(ObShareUtil::set_default_timeout_ctx(ctx, GCONF.rpc_timeout))) {
 | 
			
		||||
      LOG_WARN("fail to set timeout ctx", KR(ret));
 | 
			
		||||
    }
 | 
			
		||||
    for (int64_t i = 0; OB_SUCC(ret) && i < status_info_array.count(); ++i) {
 | 
			
		||||
      ObLSStatusInfo &info = status_info_array.at(i);
 | 
			
		||||
      if (info.ls_is_dropping()) {
 | 
			
		||||
        //存在dropping状态的日志流,不确定是不是上一轮负载均衡任务的残留
 | 
			
		||||
        //先等dropping变成wait_offline吧
 | 
			
		||||
        ret = OB_NEED_WAIT;
 | 
			
		||||
        LOG_WARN("has dropping ls, need wait", K(ret), K(info));
 | 
			
		||||
        if (OB_TMP_FAIL(task_comment_.assign_fmt("Wait for LS %ld in DROPPING status to become OFFLINE",
 | 
			
		||||
                info.ls_id_.id()))) {
 | 
			
		||||
          LOG_WARN("failed to assign task comment", KR(tmp_ret), K(info));
 | 
			
		||||
        }
 | 
			
		||||
      } else if (!info.ls_is_wait_offline()) {
 | 
			
		||||
        //负载均衡过程中不会存在tenant_dropping的状态,其他状态不考虑
 | 
			
		||||
      } else {
 | 
			
		||||
        offline_ls_count++;
 | 
			
		||||
        const int64_t timeout = ctx.get_timeout();
 | 
			
		||||
        if (OB_FAIL(arg.init(tenant_id_, info.ls_id_, false))) {
 | 
			
		||||
          LOG_WARN("failed to init arg", KR(ret), K(arg));
 | 
			
		||||
          //实际上没有必要一定是leader副本,只是去leader上比较方面,所以只要
 | 
			
		||||
          //获取回来就不用校验
 | 
			
		||||
        } else if (OB_FAIL(GCTX.location_service_->get_leader(
 | 
			
		||||
                GCONF.cluster_id, tenant_id_, info.ls_id_, false, leader))) {
 | 
			
		||||
          LOG_WARN("failed to get leader", KR(ret), K(tenant_id_), K(info));
 | 
			
		||||
        } else if (OB_FAIL(proxy.call(leader, timeout, tenant_id_, arg))) {
 | 
			
		||||
          LOG_WARN("failed to send rpc", KR(ret), K(leader), K(timeout),
 | 
			
		||||
              K(tenant_id_), K(arg));
 | 
			
		||||
        }
 | 
			
		||||
        if (OB_FAIL(ret)) {
 | 
			
		||||
          if (OB_TMP_FAIL(GCTX.location_service_->nonblock_renew(
 | 
			
		||||
                  GCONF.cluster_id, tenant_id_, info.ls_id_))) {
 | 
			
		||||
            LOG_WARN("failed to renew location", KR(ret), KR(tmp_ret), K(tenant_id_), K(info));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }//end else
 | 
			
		||||
    }//end for
 | 
			
		||||
    if (0 == offline_ls_count) {
 | 
			
		||||
      //nothing todo
 | 
			
		||||
    } else if (OB_TMP_FAIL(proxy.wait_all(return_code_array))) {
 | 
			
		||||
      LOG_WARN("wait all batch result failed", KR(ret), KR(tmp_ret));
 | 
			
		||||
      ret = OB_SUCC(ret) ? tmp_ret : ret;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@
 | 
			
		||||
#include "share/ob_thread_mgr.h" //OBTGDefIDEnum
 | 
			
		||||
#include "share/ob_balance_define.h"  // ObBalanceJobID, ObBalanceTaskID
 | 
			
		||||
#include "share/ls/ob_ls_i_life_manager.h"//ObLSStatus
 | 
			
		||||
#include "rootserver/ob_rs_async_rpc_proxy.h"//get_offline_scn
 | 
			
		||||
 | 
			
		||||
namespace oceanbase
 | 
			
		||||
{
 | 
			
		||||
@ -86,7 +87,8 @@ private:
 | 
			
		||||
                                   bool &skip_next_status);
 | 
			
		||||
  int cancel_current_task_status_(const share::ObBalanceTask &task, ObMySQLTransaction &trans, bool &skip_next_status);
 | 
			
		||||
  int cancel_other_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans);
 | 
			
		||||
  int process_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans);
 | 
			
		||||
  int process_init_task_(const share::ObBalanceTask &task, ObMySQLTransaction &trans,
 | 
			
		||||
      bool &skip_next_status);
 | 
			
		||||
  int wait_ls_to_target_status_(const share::ObLSID &ls_id, const share::ObLSStatus ls_status, bool &skip_next_status);
 | 
			
		||||
  int wait_alter_ls_(const share::ObBalanceTask &task, bool &skip_next_status);
 | 
			
		||||
  int set_ls_to_merge_(const share::ObBalanceTask &task, ObMySQLTransaction &trans);
 | 
			
		||||
@ -101,6 +103,11 @@ private:
 | 
			
		||||
  int wait_tenant_ready_();
 | 
			
		||||
  int try_update_task_comment_(const share::ObBalanceTask &task,
 | 
			
		||||
  const common::ObSqlString &comment, ObISQLClient &sql_client);
 | 
			
		||||
  int wait_can_create_new_ls_(share::SCN &create_scn);
 | 
			
		||||
  int get_max_offline_scn_(share::SCN &offline_scn, int64_t &offline_ls_count);
 | 
			
		||||
  int get_ls_offline_scn_by_rpc_(ObGetLSReplayedScnProxy &proxy,
 | 
			
		||||
                          int64_t &offline_ls_count,
 | 
			
		||||
                          ObIArray<int> &return_code_array);
 | 
			
		||||
private:
 | 
			
		||||
  bool inited_;
 | 
			
		||||
  uint64_t tenant_id_;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user