[CP] fix standby replay 4719
This commit is contained in:
		@ -576,10 +576,14 @@ TEST_F(TestBalanceOperator, ls_balance_helper)
 | 
			
		||||
  common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
 | 
			
		||||
 | 
			
		||||
  ObBalanceTaskHelper new_task;
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task));
 | 
			
		||||
  ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts);
 | 
			
		||||
  ASSERT_EQ(ls_group_id, new_task.get_ls_group_id());
 | 
			
		||||
  ObArray<ObBalanceTaskHelper> new_task;
 | 
			
		||||
  SCN op_scn;
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
 | 
			
		||||
        tenant_id, sql_proxy, op_scn, new_task));
 | 
			
		||||
  ASSERT_EQ(1, new_task.count());
 | 
			
		||||
  ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
 | 
			
		||||
  ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
 | 
			
		||||
  ts += 10000000;
 | 
			
		||||
  ObBalanceTaskHelperOp new_op(0);
 | 
			
		||||
  ls_group_id = 1001;
 | 
			
		||||
@ -588,14 +592,20 @@ TEST_F(TestBalanceOperator, ls_balance_helper)
 | 
			
		||||
      tenant_id, scn, new_op,
 | 
			
		||||
      src_ls_id, dest_ls_id, ls_group_id));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
 | 
			
		||||
        tenant_id, sql_proxy, op_scn, new_task));
 | 
			
		||||
  ASSERT_EQ(2, new_task.count());
 | 
			
		||||
  ts = 10000000000000;
 | 
			
		||||
  ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts);
 | 
			
		||||
  ASSERT_EQ(new_task.get_ls_group_id(), 0);
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id, new_task.get_operation_scn(), sql_proxy));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task));
 | 
			
		||||
  ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
 | 
			
		||||
  ASSERT_EQ(new_task.at(0).get_ls_group_id(), 0);
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id,
 | 
			
		||||
        new_task.at(0).get_operation_scn(), sql_proxy));
 | 
			
		||||
  ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
 | 
			
		||||
        tenant_id, sql_proxy, op_scn, new_task));
 | 
			
		||||
  ASSERT_EQ(1, new_task.count());
 | 
			
		||||
 | 
			
		||||
  ASSERT_EQ(ls_group_id, new_task.get_ls_group_id());
 | 
			
		||||
  ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
 | 
			
		||||
}
 | 
			
		||||
} // namespace share
 | 
			
		||||
} // namespace oceanbase
 | 
			
		||||
 | 
			
		||||
@ -1077,10 +1077,9 @@ int ObRecoveryLSService::do_standby_balance_()
 | 
			
		||||
int ObRecoveryLSService::do_ls_balance_task_()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObBalanceTaskHelper ls_balance_task;
 | 
			
		||||
  ObArray<ObBalanceTaskHelper> ls_balance_tasks;
 | 
			
		||||
  ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
 | 
			
		||||
  ObAllTenantInfo tenant_info;
 | 
			
		||||
  bool has_next_task = true;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret), K(inited_));
 | 
			
		||||
@ -1090,97 +1089,156 @@ int ObRecoveryLSService::do_ls_balance_task_()
 | 
			
		||||
  } else if (OB_ISNULL(tenant_info_loader)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("mtl pointer is null", KR(ret), KP(tenant_info_loader));
 | 
			
		||||
  }
 | 
			
		||||
  while (OB_SUCC(ret) && has_next_task) {
 | 
			
		||||
    ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
 | 
			
		||||
          *proxy_, ls_balance_task);
 | 
			
		||||
  } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
 | 
			
		||||
    LOG_WARN("get_tenant_info failed", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
 | 
			
		||||
          tenant_id_, *proxy_, tenant_info.get_standby_scn(),
 | 
			
		||||
          ls_balance_tasks))) {
 | 
			
		||||
    if (OB_ENTRY_NOT_EXIST == ret) {
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
      has_next_task = false;
 | 
			
		||||
    } else if (OB_FAIL(ret)) {
 | 
			
		||||
      LOG_WARN("failed to get balance task", KR(ret), K(tenant_id_));
 | 
			
		||||
    } else if (has_set_stop()) {
 | 
			
		||||
      ret = OB_IN_STOP_STATE;
 | 
			
		||||
      LOG_WARN("thread is in stop state", KR(ret));
 | 
			
		||||
    } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
 | 
			
		||||
      LOG_WARN("get_tenant_info failed", K(ret));
 | 
			
		||||
    } else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) {
 | 
			
		||||
      const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
 | 
			
		||||
      //transfer_scn maybe sync_scn
 | 
			
		||||
      SCN transfer_scn;
 | 
			
		||||
      START_TRANSACTION(proxy_, exec_tenant_id)
 | 
			
		||||
      if (OB_FAIL(ret)) {
 | 
			
		||||
        LOG_WARN("failed to start trans", KR(ret));
 | 
			
		||||
      } else if (ls_balance_task.get_task_op().is_ls_alter()) {
 | 
			
		||||
        if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
 | 
			
		||||
          LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
 | 
			
		||||
        }
 | 
			
		||||
      } else if (ls_balance_task.get_task_op().is_transfer_end()) {
 | 
			
		||||
        transfer_scn = ls_balance_task.get_operation_scn();
 | 
			
		||||
      } else if (ls_balance_task.get_task_op().is_transfer_begin()) {
 | 
			
		||||
        //find transfer end, or tenant is in flashback
 | 
			
		||||
        ObBalanceTaskHelper transfer_end_task;
 | 
			
		||||
        ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_,
 | 
			
		||||
            ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(),
 | 
			
		||||
            ls_balance_task.get_dest_ls(), trans, transfer_end_task);
 | 
			
		||||
        if (OB_SUCC(ret)) {
 | 
			
		||||
          //if has transfer end, can remove transfer begin
 | 
			
		||||
          transfer_scn = ls_balance_task.get_operation_scn();
 | 
			
		||||
          LOG_INFO("has transfer end task", KR(ret), K(ls_balance_task), K(transfer_end_task));
 | 
			
		||||
        } else if (OB_ENTRY_NOT_EXIST != ret) {
 | 
			
		||||
          LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task));
 | 
			
		||||
        } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status()
 | 
			
		||||
              || tenant_info.is_prepare_flashback_for_failover_to_primary_status()) {
 | 
			
		||||
          //check tenant_info status and check wait readable_scn is equal to sync_scn
 | 
			
		||||
          ret = OB_SUCCESS;
 | 
			
		||||
          transfer_scn = tenant_info.get_sync_scn();
 | 
			
		||||
          if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) {
 | 
			
		||||
            ret = OB_NEED_WAIT;
 | 
			
		||||
            LOG_WARN("must wait repay to newest", KR(ret), K(tenant_id_), K(tenant_info),
 | 
			
		||||
                K(ls_balance_task));
 | 
			
		||||
          } else {
 | 
			
		||||
            LOG_INFO("replay to newest", K(tenant_info), K(ls_balance_task));
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          ret = OB_NEED_RETRY;
 | 
			
		||||
          LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      if (OB_FAIL(ret)) {
 | 
			
		||||
      } else if (ls_balance_task.get_task_op().is_transfer_begin()
 | 
			
		||||
          || ls_balance_task.get_task_op().is_transfer_end()) {
 | 
			
		||||
        bool is_replay_finish = false;
 | 
			
		||||
        if (!transfer_scn.is_valid()) {
 | 
			
		||||
          ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
          LOG_WARN("transfer scn is invalid", KR(ret), K(transfer_scn), K(tenant_info), K(ls_balance_task));
 | 
			
		||||
        } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
 | 
			
		||||
                tenant_id_, ls_balance_task.get_src_ls(),
 | 
			
		||||
                ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) {
 | 
			
		||||
          LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task),
 | 
			
		||||
              K(tenant_info), K(transfer_scn));
 | 
			
		||||
        } else if (!is_replay_finish) {
 | 
			
		||||
          ret = OB_NEED_RETRY;
 | 
			
		||||
          LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
 | 
			
		||||
              ls_balance_task.get_operation_scn(), trans))) {
 | 
			
		||||
        LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
 | 
			
		||||
      } else {
 | 
			
		||||
        LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
 | 
			
		||||
      }
 | 
			
		||||
      END_TRANSACTION(trans)
 | 
			
		||||
      LOG_INFO("no need process balance task", K(tenant_info));
 | 
			
		||||
    } else {
 | 
			
		||||
      if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
 | 
			
		||||
        LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
 | 
			
		||||
      }
 | 
			
		||||
      has_next_task = false;
 | 
			
		||||
      LOG_WARN("failed to load task", KR(ret), K(tenant_id_), K(tenant_info));
 | 
			
		||||
    }
 | 
			
		||||
  }//end while
 | 
			
		||||
  } else {
 | 
			
		||||
    //使用接口保证获取到的ls_balance_task都是可读点越过,可以处理的task,task是按照顺序删除的,
 | 
			
		||||
    //这个是必须要保证的
 | 
			
		||||
    for (int64_t i = 0; OB_SUCC(ret) && i < ls_balance_tasks.count() && !has_set_stop(); ++i) {
 | 
			
		||||
      const ObBalanceTaskHelper &ls_balance_task = ls_balance_tasks.at(i);
 | 
			
		||||
      //按顺序梳理task,不能如果一个task处理失败就失败掉
 | 
			
		||||
      if (OB_FAIL(try_do_ls_balance_task_(ls_balance_task, tenant_info))) {
 | 
			
		||||
        LOG_WARN("failed to ls balance task", KR(ret), K(ls_balance_task), K(tenant_info));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObRecoveryLSService::try_do_ls_balance_task_(
 | 
			
		||||
    const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
    const share::ObAllTenantInfo &tenant_info)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret), K(inited_));
 | 
			
		||||
  } else if (OB_UNLIKELY(!ls_balance_task.is_valid() || !tenant_info.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(ls_balance_task), K(tenant_info));
 | 
			
		||||
  } else if (OB_ISNULL(proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("sql is null", KR(ret), KP(proxy_));
 | 
			
		||||
  } else {
 | 
			
		||||
    const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
 | 
			
		||||
    bool can_remove = false;
 | 
			
		||||
    START_TRANSACTION(proxy_, exec_tenant_id)
 | 
			
		||||
    if (OB_FAIL(ret)) {
 | 
			
		||||
      LOG_WARN("failed to start trans", KR(ret));
 | 
			
		||||
    } else if (ls_balance_task.get_task_op().is_ls_alter()) {
 | 
			
		||||
      can_remove = true;
 | 
			
		||||
      if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
 | 
			
		||||
        LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (ls_balance_task.get_task_op().is_transfer_end()) {
 | 
			
		||||
      can_remove = true;
 | 
			
		||||
      bool is_replay_finish = true;
 | 
			
		||||
      if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
 | 
			
		||||
              tenant_id_, ls_balance_task.get_src_ls(),
 | 
			
		||||
              ls_balance_task.get_dest_ls(), ls_balance_task.get_operation_scn(),
 | 
			
		||||
              is_replay_finish))) {
 | 
			
		||||
        LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_),
 | 
			
		||||
            K(ls_balance_task), K(tenant_info));
 | 
			
		||||
      } else if (!is_replay_finish) {
 | 
			
		||||
        ret = OB_NEED_RETRY;
 | 
			
		||||
        LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task));
 | 
			
		||||
      }
 | 
			
		||||
    } else if (ls_balance_task.get_task_op().is_transfer_begin()) {
 | 
			
		||||
      if (OB_FAIL(check_transfer_begin_can_remove_(ls_balance_task, tenant_info, can_remove))) {
 | 
			
		||||
        LOG_WARN("failed to check transfer begin can remove", KR(ret),
 | 
			
		||||
            K(ls_balance_task), K(tenant_info));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task));
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_FAIL(ret)) {
 | 
			
		||||
    } else if (!can_remove) {
 | 
			
		||||
      LOG_INFO("balance task helper can not remove", K(ls_balance_task), K(tenant_info));
 | 
			
		||||
    } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
 | 
			
		||||
            ls_balance_task.get_operation_scn(), trans))) {
 | 
			
		||||
      LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
 | 
			
		||||
    }
 | 
			
		||||
    END_TRANSACTION(trans)
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObRecoveryLSService::check_transfer_begin_can_remove_(
 | 
			
		||||
    const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
    const share::ObAllTenantInfo &tenant_info,
 | 
			
		||||
    bool &can_remove)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  can_remove = true;
 | 
			
		||||
  if (OB_UNLIKELY(!inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", KR(ret), K(inited_));
 | 
			
		||||
  } else if (OB_UNLIKELY(!ls_balance_task.is_valid()
 | 
			
		||||
        || !ls_balance_task.get_task_op().is_transfer_begin())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(ls_balance_task));
 | 
			
		||||
  } else if (OB_ISNULL(proxy_)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("sql can't null", K(ret), K(proxy_));
 | 
			
		||||
  } else {
 | 
			
		||||
    //find transfer end, or tenant is in flashback
 | 
			
		||||
    ObBalanceTaskHelper transfer_end_task;
 | 
			
		||||
    SCN transfer_scn;
 | 
			
		||||
    bool is_replay_finish = false;
 | 
			
		||||
    ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_,
 | 
			
		||||
        ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(),
 | 
			
		||||
        ls_balance_task.get_dest_ls(), *proxy_, transfer_end_task);
 | 
			
		||||
    if (OB_SUCC(ret)) {
 | 
			
		||||
      //if has transfer end, can remove transfer begin
 | 
			
		||||
      transfer_scn = ls_balance_task.get_operation_scn();
 | 
			
		||||
      LOG_INFO("has transfer end task, can remove transfer begin", KR(ret),
 | 
			
		||||
          K(ls_balance_task), K(transfer_end_task));
 | 
			
		||||
    } else if (OB_ENTRY_NOT_EXIST != ret) {
 | 
			
		||||
      LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task));
 | 
			
		||||
    } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status()
 | 
			
		||||
        || tenant_info.is_prepare_flashback_for_failover_to_primary_status()) {
 | 
			
		||||
      //check tenant_info status and check wait readable_scn is equal to sync_scn
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
      transfer_scn = tenant_info.get_sync_scn();
 | 
			
		||||
      if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) {
 | 
			
		||||
        can_remove = false;
 | 
			
		||||
        LOG_WARN("There are transfer tasks in progress. Must wait for replay to newest",
 | 
			
		||||
KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task));
 | 
			
		||||
      } else {
 | 
			
		||||
        LOG_INFO("replay to newest, can remove transfer begin before switchover/failover to primary",
 | 
			
		||||
            K(tenant_info), K(ls_balance_task));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      can_remove = false;
 | 
			
		||||
      LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task));
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_FAIL(ret) || !can_remove) {
 | 
			
		||||
    } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
 | 
			
		||||
            tenant_id_, ls_balance_task.get_src_ls(),
 | 
			
		||||
            ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) {
 | 
			
		||||
      LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task),
 | 
			
		||||
          K(tenant_info), K(transfer_scn));
 | 
			
		||||
    } else if (!is_replay_finish) {
 | 
			
		||||
      ret = OB_NEED_RETRY;
 | 
			
		||||
      LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
    common::ObMySQLTransaction &trans)
 | 
			
		||||
 | 
			
		||||
@ -136,6 +136,11 @@ private:
 | 
			
		||||
 //thread1
 | 
			
		||||
 int do_standby_balance_();
 | 
			
		||||
 int do_ls_balance_task_();
 | 
			
		||||
 int try_do_ls_balance_task_(const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
     const share::ObAllTenantInfo &tenant_info);
 | 
			
		||||
 int check_transfer_begin_can_remove_(const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
     const share::ObAllTenantInfo &tenant_info,
 | 
			
		||||
     bool &can_remove);
 | 
			
		||||
 int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
                               common::ObMySQLTransaction &trans);
 | 
			
		||||
 int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr);
 | 
			
		||||
 | 
			
		||||
@ -691,7 +691,7 @@ int ObTenantBalanceService::lock_and_check_balance_job_(
 | 
			
		||||
      trans,
 | 
			
		||||
      tenant_id,
 | 
			
		||||
      OB_ALL_BALANCE_JOB_TID,
 | 
			
		||||
      EXCLUSIVE))) {
 | 
			
		||||
      EXCLUSIVE, false))) {
 | 
			
		||||
    LOG_WARN("lock inner table failed", KR(ret), K(tenant_id));
 | 
			
		||||
  } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
 | 
			
		||||
      tenant_id,
 | 
			
		||||
 | 
			
		||||
@ -525,13 +525,15 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
 | 
			
		||||
      LOG_INFO("data version is smaller than 4200, no need check", K(user_compat_version));
 | 
			
		||||
    } else {
 | 
			
		||||
      bool is_finish = false;
 | 
			
		||||
      ObBalanceTaskHelper ls_balance_task;
 | 
			
		||||
      ObArray<ObBalanceTaskHelper> ls_balance_tasks;
 | 
			
		||||
      ObBalanceTaskArray balance_task_array;
 | 
			
		||||
      share::ObAllTenantInfo cur_tenant_info;
 | 
			
		||||
      int tmp_ret = OB_SUCCESS;
 | 
			
		||||
      SCN max_scn;
 | 
			
		||||
      max_scn.set_max();
 | 
			
		||||
      while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) {
 | 
			
		||||
        if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
 | 
			
		||||
                *sql_proxy_, ls_balance_task))) {
 | 
			
		||||
        if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(tenant_id_,
 | 
			
		||||
                *sql_proxy_, max_scn, ls_balance_tasks))) {
 | 
			
		||||
        } else if (OB_ENTRY_NOT_EXIST == ret) {
 | 
			
		||||
          ret = OB_SUCCESS;
 | 
			
		||||
          balance_task_array.reset();
 | 
			
		||||
@ -579,7 +581,7 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
 | 
			
		||||
            LOG_WARN("failed to notify recovery ls service", KR(tmp_ret));
 | 
			
		||||
          }
 | 
			
		||||
          usleep(100L * 1000L);
 | 
			
		||||
          LOG_INFO("has balance task not finish", K(ls_balance_task),
 | 
			
		||||
          LOG_INFO("has balance task not finish", K(ls_balance_tasks),
 | 
			
		||||
              K(balance_task_array), K(cur_tenant_info));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -1391,7 +1391,7 @@ int ObTenantSnapshotUtil::check_tenant_is_in_transfer_procedure_(
 | 
			
		||||
                         trans,
 | 
			
		||||
                         tenant_id,
 | 
			
		||||
                         OB_ALL_BALANCE_JOB_TID,
 | 
			
		||||
                         EXCLUSIVE))) {
 | 
			
		||||
                         EXCLUSIVE, false/*is_from_sql*/))) {
 | 
			
		||||
    LOG_WARN("lock inner table failed", KR(ret), K(tenant_id));
 | 
			
		||||
  } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
 | 
			
		||||
                         tenant_id,
 | 
			
		||||
 | 
			
		||||
@ -163,6 +163,22 @@ bool ObBalanceTaskHelper::is_valid() const
 | 
			
		||||
  return operation_scn_.is_valid() && balance_task_helper_meta_.is_valid() && is_valid_tenant_id(tenant_id_);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskHelper::assign(const ObBalanceTaskHelper &other)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (this != &other) {
 | 
			
		||||
    reset();
 | 
			
		||||
    if (OB_FAIL(balance_task_helper_meta_.assign(other.balance_task_helper_meta_))) {
 | 
			
		||||
      LOG_WARN("failed to assign balance task helper meta", KR(ret), K(other));
 | 
			
		||||
    } else {
 | 
			
		||||
      operation_scn_ = other.operation_scn_;
 | 
			
		||||
      tenant_id_ = other.tenant_id_;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
void ObBalanceTaskHelper::reset()
 | 
			
		||||
{
 | 
			
		||||
  operation_scn_.reset();
 | 
			
		||||
@ -192,29 +208,32 @@ int ObBalanceTaskHelperTableOperator::insert_ls_balance_task(const ObBalanceTask
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskHelperTableOperator::pop_task(const uint64_t tenant_id,
 | 
			
		||||
int ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(const uint64_t tenant_id,
 | 
			
		||||
                      ObISQLClient &client,
 | 
			
		||||
                      ObBalanceTaskHelper &ls_balance_task)
 | 
			
		||||
                      const share::SCN &max_operation_scn,
 | 
			
		||||
                      ObIArray<ObBalanceTaskHelper> &ls_balance_task)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ls_balance_task.reset();
 | 
			
		||||
  ObSqlString sql;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
 | 
			
		||||
        || !max_operation_scn.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
 | 
			
		||||
  } else if (OB_FAIL(sql.assign_fmt("select * from %s order by operation_scn asc limit 1",
 | 
			
		||||
                                    OB_ALL_BALANCE_TASK_HELPER_TNAME))) {
 | 
			
		||||
    LOG_WARN("failed to assign sql", KR(ret), K(sql));
 | 
			
		||||
  } else if (OB_FAIL(exec_get_single_row_(sql, tenant_id, client, ls_balance_task))) {
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(max_operation_scn));
 | 
			
		||||
  } else if (OB_FAIL(sql.assign_fmt("select * from %s where operation_scn <= %lu order by operation_scn",
 | 
			
		||||
          OB_ALL_BALANCE_TASK_HELPER_TNAME,
 | 
			
		||||
          max_operation_scn.get_val_for_inner_table_field()))) {
 | 
			
		||||
    LOG_WARN("failed to assign sql", KR(ret), K(max_operation_scn), K(sql));
 | 
			
		||||
  } else if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, ls_balance_task))) {
 | 
			
		||||
    LOG_WARN("failed to get row", KR(ret), K(sql), K(tenant_id));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql,
 | 
			
		||||
int ObBalanceTaskHelperTableOperator::exec_get_rows_(const common::ObSqlString &sql,
 | 
			
		||||
                             const uint64_t tenant_id,
 | 
			
		||||
                             ObISQLClient &client,
 | 
			
		||||
                             ObBalanceTaskHelper &ls_balance_task)
 | 
			
		||||
                             ObIArray<ObBalanceTaskHelper> &ls_balance_tasks)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) {
 | 
			
		||||
@ -229,34 +248,41 @@ int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlSt
 | 
			
		||||
      } else if (OB_ISNULL(result = res.get_result())) {
 | 
			
		||||
        ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
        LOG_WARN("failed to get sql result", KR(ret));
 | 
			
		||||
      } else if (OB_FAIL(result->next())) {
 | 
			
		||||
        if (OB_ITER_END == ret) {
 | 
			
		||||
          ret = OB_ENTRY_NOT_EXIST;
 | 
			
		||||
          LOG_WARN("empty ls_balance_task", KR(ret), K(sql));
 | 
			
		||||
        } else {
 | 
			
		||||
          LOG_WARN("failed to get ls_balance_task", KR(ret), K(sql));
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        ObString task_op_str;
 | 
			
		||||
        uint64_t op_scn_value = 0;
 | 
			
		||||
        SCN op_scn;
 | 
			
		||||
        int64_t src_ls = 0;
 | 
			
		||||
        int64_t dest_ls = 0;
 | 
			
		||||
        uint64_t ls_group_id = OB_INVALID_ID;
 | 
			
		||||
        EXTRACT_UINT_FIELD_MYSQL(*result, "operation_scn", op_scn_value, uint64_t);
 | 
			
		||||
        EXTRACT_INT_FIELD_MYSQL(*result, "src_ls", src_ls, int64_t);
 | 
			
		||||
        EXTRACT_INT_FIELD_MYSQL(*result, "dest_ls", dest_ls, int64_t);
 | 
			
		||||
        EXTRACT_INT_FIELD_MYSQL(*result, "ls_group_id", ls_group_id, uint64_t);
 | 
			
		||||
        EXTRACT_VARCHAR_FIELD_MYSQL(*result, "operation_type", task_op_str);
 | 
			
		||||
        if (OB_FAIL(ret)) {
 | 
			
		||||
          LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id));
 | 
			
		||||
        } else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) {
 | 
			
		||||
          LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value));
 | 
			
		||||
        } else if (OB_FAIL(ls_balance_task.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls),
 | 
			
		||||
                ObLSID(dest_ls), ls_group_id))) {
 | 
			
		||||
          LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id));
 | 
			
		||||
        while(OB_SUCC(ret) && OB_SUCC(result->next())) {
 | 
			
		||||
          ObBalanceTaskHelper task_help;
 | 
			
		||||
          ObString task_op_str;
 | 
			
		||||
          uint64_t op_scn_value = 0;
 | 
			
		||||
          SCN op_scn;
 | 
			
		||||
          int64_t src_ls = 0;
 | 
			
		||||
          int64_t dest_ls = 0;
 | 
			
		||||
          uint64_t ls_group_id = OB_INVALID_ID;
 | 
			
		||||
          EXTRACT_UINT_FIELD_MYSQL(*result, "operation_scn", op_scn_value, uint64_t);
 | 
			
		||||
          EXTRACT_INT_FIELD_MYSQL(*result, "src_ls", src_ls, int64_t);
 | 
			
		||||
          EXTRACT_INT_FIELD_MYSQL(*result, "dest_ls", dest_ls, int64_t);
 | 
			
		||||
          EXTRACT_INT_FIELD_MYSQL(*result, "ls_group_id", ls_group_id, uint64_t);
 | 
			
		||||
          EXTRACT_VARCHAR_FIELD_MYSQL(*result, "operation_type", task_op_str);
 | 
			
		||||
          if (OB_FAIL(ret)) {
 | 
			
		||||
            LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id));
 | 
			
		||||
          } else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) {
 | 
			
		||||
            LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value));
 | 
			
		||||
          } else if (OB_FAIL(task_help.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls),
 | 
			
		||||
                  ObLSID(dest_ls), ls_group_id))) {
 | 
			
		||||
            LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id));
 | 
			
		||||
          } else if (OB_FAIL(ls_balance_tasks.push_back(task_help))) {
 | 
			
		||||
            LOG_WARN("failed to push back task", KR(ret), K(task_help));
 | 
			
		||||
          }
 | 
			
		||||
        }//end while
 | 
			
		||||
        if (OB_ITER_END == ret) {
 | 
			
		||||
          ret = OB_SUCCESS;
 | 
			
		||||
          if (0 == ls_balance_tasks.count()) {
 | 
			
		||||
            ret = OB_ENTRY_NOT_EXIST;
 | 
			
		||||
            LOG_WARN("no balance task help", KR(ret), K(sql));
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          LOG_WARN("failed to get cell", KR(ret));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      }//end else
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -313,7 +339,6 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObSqlString sql;
 | 
			
		||||
  ObBalanceTaskHelperOp op(ObBalanceTaskHelperOp::LS_BALANCE_TASK_OP_TRANSFER_END);
 | 
			
		||||
  const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !op_scn.is_valid()
 | 
			
		||||
        || !src_ls.is_valid() || !dest_ls.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
@ -328,6 +353,28 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql,
 | 
			
		||||
      const uint64_t tenant_id, ObISQLClient &client,
 | 
			
		||||
      ObBalanceTaskHelper &ls_balance_task)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(sql));
 | 
			
		||||
  } else {
 | 
			
		||||
    ObArray<ObBalanceTaskHelper> task_array;
 | 
			
		||||
    if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, task_array))) {
 | 
			
		||||
      LOG_WARN("failed to get single row", KR(ret), K(sql), K(tenant_id));
 | 
			
		||||
    } else if (1 != task_array.count()) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("task array count is not expected", KR(ret), K(sql), K(task_array));
 | 
			
		||||
    } else if (OB_FAIL(ls_balance_task.assign(task_array.at(0)))) {
 | 
			
		||||
      LOG_WARN("failed to assign balance task", KR(ret), K(task_array));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}//end of share
 | 
			
		||||
}//end of ob
 | 
			
		||||
 | 
			
		||||
@ -118,6 +118,7 @@ public:
 | 
			
		||||
           const share::ObLSID &dest_ls,
 | 
			
		||||
           const uint64_t ls_group_id);
 | 
			
		||||
  bool is_valid() const;
 | 
			
		||||
  int assign(const ObBalanceTaskHelper &other);
 | 
			
		||||
  TO_STRING_KV(K_(balance_task_helper_meta), K_(operation_scn));
 | 
			
		||||
  share::SCN get_operation_scn() const
 | 
			
		||||
  {
 | 
			
		||||
@ -161,18 +162,20 @@ public:
 | 
			
		||||
  static int insert_ls_balance_task(const ObBalanceTaskHelper &ls_balance_task,
 | 
			
		||||
                     ObISQLClient &client);
 | 
			
		||||
  /**
 | 
			
		||||
   * @description: get task has min operation scn
 | 
			
		||||
   * @description: get task which operation scn less than max_scn
 | 
			
		||||
   * @param[in] tenant_id : user_tenant_id
 | 
			
		||||
   * @param[in] client : sql client or trans
 | 
			
		||||
   * @param[out] ls_balance_task : ls_balance_task of min operation_scn
 | 
			
		||||
   * @param[in] max_operation_scn : max_scn
 | 
			
		||||
   * @param[out] ls_balance_tasks : ls_balance_task's operation_scn less than max_scn
 | 
			
		||||
   * @return :
 | 
			
		||||
   *  OB_SUCCESS : get a valid ls_balance_task
 | 
			
		||||
   *  OB_SUCCESS : get valid ls_balance_task array
 | 
			
		||||
   *  OB_ENTRY_NOT_EXIST : empty
 | 
			
		||||
   *  OTHER : fail
 | 
			
		||||
   */
 | 
			
		||||
  static int pop_task(const uint64_t tenant_id,
 | 
			
		||||
  static int load_tasks_order_by_scn(const uint64_t tenant_id,
 | 
			
		||||
                      ObISQLClient &client,
 | 
			
		||||
                      ObBalanceTaskHelper &ls_balance_task);
 | 
			
		||||
                      const share::SCN &max_operation_scn,
 | 
			
		||||
                      ObIArray<ObBalanceTaskHelper> &ls_balance_task);
 | 
			
		||||
  /**
 | 
			
		||||
   * @description: remove task of operation_scn
 | 
			
		||||
   * @param[in] tenant_id : user_tenant_id
 | 
			
		||||
@ -207,10 +210,13 @@ public:
 | 
			
		||||
                                   ObISQLClient &client,
 | 
			
		||||
                                   ObBalanceTaskHelper &ls_balance_task);
 | 
			
		||||
private:
 | 
			
		||||
  static int exec_get_single_row_(const common::ObSqlString &sql,
 | 
			
		||||
  static int exec_get_rows_(const common::ObSqlString &sql,
 | 
			
		||||
                             const uint64_t tenant_id,
 | 
			
		||||
                             ObISQLClient &client,
 | 
			
		||||
                             ObBalanceTaskHelper &ls_balance_task);
 | 
			
		||||
                             ObIArray<ObBalanceTaskHelper> &ls_balance_tasks);
 | 
			
		||||
  static int exec_get_single_row_(const common::ObSqlString &sql,
 | 
			
		||||
      const uint64_t tenant_id, ObISQLClient &client,
 | 
			
		||||
      ObBalanceTaskHelper &ls_balance_task);
 | 
			
		||||
};
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -32,7 +32,8 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans(
 | 
			
		||||
    common::ObMySQLTransaction &trans,
 | 
			
		||||
    const uint64_t tenant_id,
 | 
			
		||||
    const uint64_t inner_table_id,
 | 
			
		||||
    const ObTableLockMode &lock_mode)
 | 
			
		||||
    const ObTableLockMode &lock_mode,
 | 
			
		||||
    const bool is_from_sql)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObInnerSQLConnection *conn = NULL;
 | 
			
		||||
@ -54,6 +55,7 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans(
 | 
			
		||||
    table_lock_arg.timeout_us_ = ctx.get_timeout();
 | 
			
		||||
    table_lock_arg.table_id_ = inner_table_id;
 | 
			
		||||
    table_lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
 | 
			
		||||
    table_lock_arg.is_from_sql_ = is_from_sql;
 | 
			
		||||
    if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, table_lock_arg, conn))) {
 | 
			
		||||
      LOG_WARN("lock table failed", KR(ret), K(table_lock_arg));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -47,6 +47,7 @@ public:
 | 
			
		||||
   * @param[in] tenant_id:       tenant_id of the inner table
 | 
			
		||||
   * @param[in] inner_table_id:  inner table id which you want to lock
 | 
			
		||||
   * @param[in] lock_mode:       table lock mode
 | 
			
		||||
   * @param[in] is_from_sql:     is from sql table_lock can retry
 | 
			
		||||
   * @return
 | 
			
		||||
   * - OB_SUCCESS:               lock inner table successfully
 | 
			
		||||
   * - OB_TRY_LOCK_ROW_CONFLICT: lock conflict
 | 
			
		||||
@ -56,7 +57,8 @@ public:
 | 
			
		||||
      common::ObMySQLTransaction &trans,
 | 
			
		||||
      const uint64_t tenant_id,
 | 
			
		||||
      const uint64_t inner_table_id,
 | 
			
		||||
      const ObTableLockMode &lock_mode);
 | 
			
		||||
      const ObTableLockMode &lock_mode,
 | 
			
		||||
      const bool is_from_sql);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObLSObjLockUtil
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user