[CP] fix transfer and switchover
This commit is contained in:
@ -183,7 +183,7 @@ void ObRecoveryLSService::do_work()
|
||||
|
||||
if (OB_TMP_FAIL(do_ls_balance_task_())) {
|
||||
ret = OB_SUCC(ret) ? tmp_ret : ret;
|
||||
LOG_WARN("failed to process alter ls group", KR(ret), KR(tmp_ret));
|
||||
LOG_WARN("failed to process ls balance task", KR(ret), KR(tmp_ret));
|
||||
}
|
||||
|
||||
(void)try_tenant_upgrade_end_();
|
||||
@ -1107,22 +1107,51 @@ int ObRecoveryLSService::do_ls_balance_task_()
|
||||
} else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) {
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||
START_TRANSACTION(proxy_, exec_tenant_id)
|
||||
if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
|
||||
ls_balance_task.get_operation_scn(), trans))) {
|
||||
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||
if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to start trans", KR(ret));
|
||||
} else if (ls_balance_task.get_task_op().is_ls_alter()) {
|
||||
if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
|
||||
LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
} else if (ls_balance_task.get_task_op().is_transfer_begin()
|
||||
|| ls_balance_task.get_task_op().is_transfer_end()) {
|
||||
//nothing
|
||||
} else if (ls_balance_task.get_task_op().is_transfer_end()) {
|
||||
} else if (ls_balance_task.get_task_op().is_transfer_begin()) {
|
||||
//find transfer end, or tenant is in flashback
|
||||
ObBalanceTaskHelper transfer_end_task;
|
||||
ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_,
|
||||
ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(),
|
||||
ls_balance_task.get_dest_ls(), trans, transfer_end_task);
|
||||
if (OB_SUCC(ret)) {
|
||||
//if has transfer end, can remove transfer begin
|
||||
LOG_INFO("has transfer end task, can remove transfer begin", KR(ret),
|
||||
K(ls_balance_task), K(transfer_end_task));
|
||||
} else if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||
} else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status()
|
||||
|| tenant_info.is_prepare_flashback_for_failover_to_primary_status()) {
|
||||
//check tenant_info status and check wait readable_scn is equal to sync_scn
|
||||
ret = OB_SUCCESS;
|
||||
if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) {
|
||||
ret = OB_NEED_WAIT;
|
||||
LOG_WARN("must wait repay to newest", KR(ret), K(tenant_id_), K(tenant_info),
|
||||
K(ls_balance_task));
|
||||
} else {
|
||||
LOG_INFO("repay to newest, can remove transfer begin before failover", K(tenant_info), K(ls_balance_task));
|
||||
}
|
||||
} else {
|
||||
ret = OB_NEED_RETRY;
|
||||
LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task));
|
||||
}
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
|
||||
ls_balance_task.get_operation_scn(), trans))) {
|
||||
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
|
||||
} else {
|
||||
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
||||
}
|
||||
END_TRANSACTION(trans)
|
||||
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
|
||||
} else {
|
||||
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
||||
LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
|
||||
|
@ -516,15 +516,45 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
|
||||
} else {
|
||||
bool is_finish = false;
|
||||
ObBalanceTaskHelper ls_balance_task;
|
||||
ObBalanceTaskArray balance_task_array;
|
||||
share::ObAllTenantInfo cur_tenant_info;
|
||||
while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) {
|
||||
if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
|
||||
*sql_proxy_, ls_balance_task))) {
|
||||
} else if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
is_finish = true;
|
||||
} else if (OB_SUCC(ret)) {
|
||||
balance_task_array.reset();
|
||||
/*
|
||||
__all_balance_task_helper表被清空不代表没有transfer任务,例子:
|
||||
租户A是主库,发起了一轮负载均衡,balance_task表里进入transfer状态但是没有结束的时候切换成备库。
|
||||
租户B是备库,在切成主库的时候会在回放到最新的时候,把TRANSFER_BEGIN的给清理掉。
|
||||
如果租户B作为主库的时候也执行了几轮transfer任务但是还是没有结束掉balance_task表的transfer状态。
|
||||
这个时候租户A在切成主库的时候是没有办法判断自己是否有transfer的发生。
|
||||
为了解决这个问题,我们去读取了__all_balance_task表,如果这个表中有处于transfer状态的任务,则一定要等回放到最新。
|
||||
同时这里也有一个问题:可读点会不会特别落后,我们从两个方面论述A租户可读点不会有问题
|
||||
1. 如果可以清理__all_balance_task_helper表,则可读点一定越过了表中的记录,即使B新开启了一轮负载均衡任务,那也不会有问题
|
||||
2. 单纯的经过主切备,可读点会推高越过最新的GTS,所以肯定可以读到最新的transfer任务*/
|
||||
if (OB_FAIL(ObBalanceTaskTableOperator::load_need_transfer_task(
|
||||
tenant_id_, balance_task_array, *sql_proxy_))) {
|
||||
LOG_WARN("failed to load need transfer task", KR(ret), K(tenant_id_));
|
||||
} else if (0 == balance_task_array.count()) {
|
||||
is_finish = true;
|
||||
LOG_INFO("balance task finish", K(tenant_id_));
|
||||
} else if (OB_FAIL(ObAllTenantInfoProxy::load_tenant_info(
|
||||
tenant_id_, sql_proxy_, false, cur_tenant_info))) {
|
||||
LOG_WARN("failed to load tenant info", KR(ret), K(tenant_id_));
|
||||
} else if (cur_tenant_info.get_sync_scn() == cur_tenant_info.get_standby_scn()) {
|
||||
is_finish = true;
|
||||
LOG_INFO("has transfer task, and repaly to newest", KR(ret), K(cur_tenant_info));
|
||||
}
|
||||
} else if (OB_FAIL(ret)) {
|
||||
LOG_WARN("failed to pop task", KR(ret), K(tenant_id_));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !is_finish) {
|
||||
usleep(100L * 1000L);
|
||||
LOG_INFO("has balance task not finish", K(ls_balance_task));
|
||||
LOG_INFO("has balance task not finish", K(ls_balance_task),
|
||||
K(balance_task_array), K(cur_tenant_info));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -205,6 +205,21 @@ int ObBalanceTaskHelperTableOperator::pop_task(const uint64_t tenant_id,
|
||||
} else if (OB_FAIL(sql.assign_fmt("select * from %s order by operation_scn asc limit 1",
|
||||
OB_ALL_BALANCE_TASK_HELPER_TNAME))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(sql));
|
||||
} else if (OB_FAIL(exec_get_single_row_(sql, tenant_id, client, ls_balance_task))) {
|
||||
LOG_WARN("failed to get row", KR(ret), K(sql), K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql,
|
||||
const uint64_t tenant_id,
|
||||
ObISQLClient &client,
|
||||
ObBalanceTaskHelper &ls_balance_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
HEAP_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
@ -287,5 +302,32 @@ int ObBalanceTaskHelperTableOperator::remove_task(const uint64_t tenant_id,
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenant_id,
|
||||
const share::SCN &op_scn,
|
||||
const ObLSID &src_ls,
|
||||
const ObLSID &dest_ls,
|
||||
ObISQLClient &client,
|
||||
ObBalanceTaskHelper &ls_balance_task)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
ObBalanceTaskHelperOp op(ObBalanceTaskHelperOp::LS_BALANCE_TASK_OP_TRANSFER_END);
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !op_scn.is_valid()
|
||||
|| !src_ls.is_valid() || !dest_ls.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(op_scn), K(src_ls), K(dest_ls));
|
||||
} else if (OB_FAIL(sql.assign_fmt("select * from %s where operation_type = '%s' and "
|
||||
"operation_scn > %lu and src_ls = %ld and dest_ls = %ld order by operation_scn limit 1",
|
||||
OB_ALL_BALANCE_TASK_HELPER_TNAME, op.to_str(), op_scn.get_val_for_inner_table_field(), src_ls.id(), dest_ls.id()))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(op), K(op_scn), K(src_ls), K(dest_ls), K(sql));
|
||||
} else if (OB_FAIL(exec_get_single_row_(sql, tenant_id, client, ls_balance_task))) {
|
||||
LOG_WARN("failed to get single row", KR(ret), K(sql), K(tenant_id));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
}//end of share
|
||||
}//end of ob
|
||||
|
@ -187,6 +187,30 @@ public:
|
||||
ObISQLClient &client);
|
||||
static int fill_dml_spliter(share::ObDMLSqlSplicer &dml,
|
||||
const ObBalanceTaskHelper &ls_balance_task);
|
||||
/**
|
||||
* @description: get transfer end task while has transfer begin
|
||||
* @param[in] tenant_id : user_tenant_id
|
||||
* @param[in] op_scn : transfer begin operation scn, transfer end 's scn is larger than this
|
||||
* @param[in] src_ls : transfer_begin's source ls
|
||||
* @param[in] desc_ls : transfer_begin's destination ls
|
||||
* @param[in] client : sql client or trans
|
||||
* @param[out] ls_balance_task : ls_balance_task of min operation_scn
|
||||
* @return :
|
||||
* OB_SUCCESS : get a valid ls_balance_task
|
||||
* OB_ENTRY_NOT_EXIST : empty
|
||||
* OTHER : fail
|
||||
*/
|
||||
static int try_find_transfer_end(const uint64_t tenant_id,
|
||||
const share::SCN &op_scn,
|
||||
const ObLSID &src_ls,
|
||||
const ObLSID &dest_ls,
|
||||
ObISQLClient &client,
|
||||
ObBalanceTaskHelper &ls_balance_task);
|
||||
private:
|
||||
static int exec_get_single_row_(const common::ObSqlString &sql,
|
||||
const uint64_t tenant_id,
|
||||
ObISQLClient &client,
|
||||
ObBalanceTaskHelper &ls_balance_task);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -814,6 +814,7 @@ int ObBalanceTaskTableOperator::remove_parent_task(const uint64_t tenant_id,
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBalanceTaskTableOperator::load_can_execute_task(const uint64_t tenant_id,
|
||||
ObBalanceTaskIArray &task_array,
|
||||
ObISQLClient &client)
|
||||
@ -1067,6 +1068,29 @@ int ObBalanceTaskTableOperator::get_merge_task_dest_ls_by_src_ls(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBalanceTaskTableOperator::load_need_transfer_task(const uint64_t tenant_id,
|
||||
ObBalanceTaskIArray &task_array,
|
||||
ObISQLClient &client)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
task_array.reset();
|
||||
ObSqlString sql;
|
||||
ObBalanceTaskStatus task_status(ObBalanceTaskStatus::BALANCE_TASK_STATUS_TRANSFER);
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql.assign_fmt("select * from %s where status = '%s'",
|
||||
OB_ALL_BALANCE_TASK_TNAME, task_status.to_str()))) {
|
||||
LOG_WARN("failed to assign sql", KR(ret), K(sql), K(task_status));
|
||||
} else if (OB_FAIL(read_tasks_(tenant_id, client, sql, task_array))) {
|
||||
LOG_WARN("failed to read task", KR(ret), K(tenant_id), K(sql));
|
||||
}
|
||||
LOG_INFO("load need transfer balance task", KR(ret), K(task_array), K(sql));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int ObBalanceTaskMDSHelper::on_register(
|
||||
const char* buf,
|
||||
const int64_t len,
|
||||
|
@ -452,6 +452,17 @@ public:
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &src_ls,
|
||||
ObLSID &dest_ls);
|
||||
/*
|
||||
* @description: load all task which has transfer task
|
||||
* @param[in] tenant_id : user_tenant_id
|
||||
* @param[out] balance_task_array : get all task which has transfer task
|
||||
* @param[in] client: sql client or trans
|
||||
* @return OB_SUCCESS if success, otherwise failed
|
||||
* */
|
||||
static int load_need_transfer_task(const uint64_t tenant_id,
|
||||
ObBalanceTaskIArray &task_array,
|
||||
ObISQLClient &client);
|
||||
|
||||
};
|
||||
#undef IS_BALANCE_TASK
|
||||
}
|
||||
|
Reference in New Issue
Block a user