[CP] fix standby replay 4719

This commit is contained in:
maosy
2024-02-08 20:32:20 +00:00
committed by ob-robot
parent 9005f0a5a9
commit b325b511ca
10 changed files with 279 additions and 147 deletions

View File

@ -576,10 +576,14 @@ TEST_F(TestBalanceOperator, ls_balance_helper)
common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2();
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy)); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
ObBalanceTaskHelper new_task; ObArray<ObBalanceTaskHelper> new_task;
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); SCN op_scn;
ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts); ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
ASSERT_EQ(ls_group_id, new_task.get_ls_group_id()); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(1, new_task.count());
ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
ts += 10000000; ts += 10000000;
ObBalanceTaskHelperOp new_op(0); ObBalanceTaskHelperOp new_op(0);
ls_group_id = 1001; ls_group_id = 1001;
@ -588,14 +592,20 @@ TEST_F(TestBalanceOperator, ls_balance_helper)
tenant_id, scn, new_op, tenant_id, scn, new_op,
src_ls_id, dest_ls_id, ls_group_id)); src_ls_id, dest_ls_id, ls_group_id));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy)); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(2, new_task.count());
ts = 10000000000000; ts = 10000000000000;
ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts); ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts);
ASSERT_EQ(new_task.get_ls_group_id(), 0); ASSERT_EQ(new_task.at(0).get_ls_group_id(), 0);
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id, new_task.get_operation_scn(), sql_proxy)); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id,
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); new_task.at(0).get_operation_scn(), sql_proxy));
ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id, sql_proxy, op_scn, new_task));
ASSERT_EQ(1, new_task.count());
ASSERT_EQ(ls_group_id, new_task.get_ls_group_id()); ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id());
} }
} // namespace share } // namespace share
} // namespace oceanbase } // namespace oceanbase

View File

@ -1077,10 +1077,9 @@ int ObRecoveryLSService::do_standby_balance_()
int ObRecoveryLSService::do_ls_balance_task_() int ObRecoveryLSService::do_ls_balance_task_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObBalanceTaskHelper ls_balance_task; ObArray<ObBalanceTaskHelper> ls_balance_tasks;
ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*); ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*);
ObAllTenantInfo tenant_info; ObAllTenantInfo tenant_info;
bool has_next_task = true;
if (OB_UNLIKELY(!inited_)) { if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_)); LOG_WARN("not init", KR(ret), K(inited_));
@ -1090,43 +1089,122 @@ int ObRecoveryLSService::do_ls_balance_task_()
} else if (OB_ISNULL(tenant_info_loader)) { } else if (OB_ISNULL(tenant_info_loader)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("mtl pointer is null", KR(ret), KP(tenant_info_loader)); LOG_WARN("mtl pointer is null", KR(ret), KP(tenant_info_loader));
}
while (OB_SUCC(ret) && has_next_task) {
ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_,
*proxy_, ls_balance_task);
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
has_next_task = false;
} else if (OB_FAIL(ret)) {
LOG_WARN("failed to get balance task", KR(ret), K(tenant_id_));
} else if (has_set_stop()) {
ret = OB_IN_STOP_STATE;
LOG_WARN("thread is in stop state", KR(ret));
} else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) { } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) {
LOG_WARN("get_tenant_info failed", K(ret)); LOG_WARN("get_tenant_info failed", K(ret));
} else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) { } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(
tenant_id_, *proxy_, tenant_info.get_standby_scn(),
ls_balance_tasks))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_INFO("no need process balance task", K(tenant_info));
} else {
LOG_WARN("failed to load task", KR(ret), K(tenant_id_), K(tenant_info));
}
} else {
//使用接口保证获取到的ls_balance_task都是可读点越过,可以处理的task,task是按照顺序删除的,
//这个是必须要保证的
for (int64_t i = 0; OB_SUCC(ret) && i < ls_balance_tasks.count() && !has_set_stop(); ++i) {
const ObBalanceTaskHelper &ls_balance_task = ls_balance_tasks.at(i);
//按顺序梳理task,不能如果一个task处理失败就失败掉
if (OB_FAIL(try_do_ls_balance_task_(ls_balance_task, tenant_info))) {
LOG_WARN("failed to ls balance task", KR(ret), K(ls_balance_task), K(tenant_info));
}
}
}
return ret;
}
int ObRecoveryLSService::try_do_ls_balance_task_(
const share::ObBalanceTaskHelper &ls_balance_task,
const share::ObAllTenantInfo &tenant_info)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(!ls_balance_task.is_valid() || !tenant_info.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(ls_balance_task), K(tenant_info));
} else if (OB_ISNULL(proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql is null", KR(ret), KP(proxy_));
} else {
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
//transfer_scn maybe sync_scn bool can_remove = false;
SCN transfer_scn;
START_TRANSACTION(proxy_, exec_tenant_id) START_TRANSACTION(proxy_, exec_tenant_id)
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
LOG_WARN("failed to start trans", KR(ret)); LOG_WARN("failed to start trans", KR(ret));
} else if (ls_balance_task.get_task_op().is_ls_alter()) { } else if (ls_balance_task.get_task_op().is_ls_alter()) {
can_remove = true;
if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) { if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) {
LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task)); LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task));
} }
} else if (ls_balance_task.get_task_op().is_transfer_end()) { } else if (ls_balance_task.get_task_op().is_transfer_end()) {
transfer_scn = ls_balance_task.get_operation_scn(); can_remove = true;
bool is_replay_finish = true;
if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
tenant_id_, ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), ls_balance_task.get_operation_scn(),
is_replay_finish))) {
LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_),
K(ls_balance_task), K(tenant_info));
} else if (!is_replay_finish) {
ret = OB_NEED_RETRY;
LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task));
}
} else if (ls_balance_task.get_task_op().is_transfer_begin()) { } else if (ls_balance_task.get_task_op().is_transfer_begin()) {
if (OB_FAIL(check_transfer_begin_can_remove_(ls_balance_task, tenant_info, can_remove))) {
LOG_WARN("failed to check transfer begin can remove", KR(ret),
K(ls_balance_task), K(tenant_info));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task));
}
if (OB_FAIL(ret)) {
} else if (!can_remove) {
LOG_INFO("balance task helper can not remove", K(ls_balance_task), K(tenant_info));
} else if (OB_FAIL(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
ls_balance_task.get_operation_scn(), trans))) {
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
} else {
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
}
END_TRANSACTION(trans)
}
return ret;
}
int ObRecoveryLSService::check_transfer_begin_can_remove_(
const share::ObBalanceTaskHelper &ls_balance_task,
const share::ObAllTenantInfo &tenant_info,
bool &can_remove)
{
int ret = OB_SUCCESS;
can_remove = true;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K(inited_));
} else if (OB_UNLIKELY(!ls_balance_task.is_valid()
|| !ls_balance_task.get_task_op().is_transfer_begin())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(ls_balance_task));
} else if (OB_ISNULL(proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sql can't null", K(ret), K(proxy_));
} else {
//find transfer end, or tenant is in flashback //find transfer end, or tenant is in flashback
ObBalanceTaskHelper transfer_end_task; ObBalanceTaskHelper transfer_end_task;
SCN transfer_scn;
bool is_replay_finish = false;
ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_, ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_,
ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(), ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), trans, transfer_end_task); ls_balance_task.get_dest_ls(), *proxy_, transfer_end_task);
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
//if has transfer end, can remove transfer begin //if has transfer end, can remove transfer begin
transfer_scn = ls_balance_task.get_operation_scn(); transfer_scn = ls_balance_task.get_operation_scn();
LOG_INFO("has transfer end task", KR(ret), K(ls_balance_task), K(transfer_end_task)); LOG_INFO("has transfer end task, can remove transfer begin", KR(ret),
K(ls_balance_task), K(transfer_end_task));
} else if (OB_ENTRY_NOT_EXIST != ret) { } else if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task)); LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task));
} else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status() } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status()
@ -1135,24 +1213,19 @@ int ObRecoveryLSService::do_ls_balance_task_()
ret = OB_SUCCESS; ret = OB_SUCCESS;
transfer_scn = tenant_info.get_sync_scn(); transfer_scn = tenant_info.get_sync_scn();
if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) { if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) {
ret = OB_NEED_WAIT; can_remove = false;
LOG_WARN("must wait repay to newest", KR(ret), K(tenant_id_), K(tenant_info), LOG_WARN("There are transfer tasks in progress. Must wait for replay to newest",
K(ls_balance_task)); KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task));
} else { } else {
LOG_INFO("replay to newest", K(tenant_info), K(ls_balance_task)); LOG_INFO("replay to newest, can remove transfer begin before switchover/failover to primary",
K(tenant_info), K(ls_balance_task));
} }
} else { } else {
ret = OB_NEED_RETRY; can_remove = false;
LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task)); LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task));
ret = OB_SUCCESS;
} }
} if (OB_FAIL(ret) || !can_remove) {
if (OB_FAIL(ret)) {
} else if (ls_balance_task.get_task_op().is_transfer_begin()
|| ls_balance_task.get_task_op().is_transfer_end()) {
bool is_replay_finish = false;
if (!transfer_scn.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("transfer scn is invalid", KR(ret), K(transfer_scn), K(tenant_info), K(ls_balance_task));
} else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay(
tenant_id_, ls_balance_task.get_src_ls(), tenant_id_, ls_balance_task.get_src_ls(),
ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) { ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) {
@ -1162,26 +1235,11 @@ int ObRecoveryLSService::do_ls_balance_task_()
ret = OB_NEED_RETRY; ret = OB_NEED_RETRY;
LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn)); LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn));
} }
} }
if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_,
ls_balance_task.get_operation_scn(), trans))) {
LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task));
} else {
LOG_INFO("task can be remove", KR(ret), K(ls_balance_task));
}
END_TRANSACTION(trans)
} else {
if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info));
}
has_next_task = false;
}
}//end while
return ret; return ret;
} }
int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task,
common::ObMySQLTransaction &trans) common::ObMySQLTransaction &trans)
{ {

View File

@ -136,6 +136,11 @@ private:
//thread1 //thread1
int do_standby_balance_(); int do_standby_balance_();
int do_ls_balance_task_(); int do_ls_balance_task_();
int try_do_ls_balance_task_(const share::ObBalanceTaskHelper &ls_balance_task,
const share::ObAllTenantInfo &tenant_info);
int check_transfer_begin_can_remove_(const share::ObBalanceTaskHelper &ls_balance_task,
const share::ObAllTenantInfo &tenant_info,
bool &can_remove);
int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task,
common::ObMySQLTransaction &trans); common::ObMySQLTransaction &trans);
int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr); int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr);

View File

@ -691,7 +691,7 @@ int ObTenantBalanceService::lock_and_check_balance_job_(
trans, trans,
tenant_id, tenant_id,
OB_ALL_BALANCE_JOB_TID, OB_ALL_BALANCE_JOB_TID,
EXCLUSIVE))) { EXCLUSIVE, false))) {
LOG_WARN("lock inner table failed", KR(ret), K(tenant_id)); LOG_WARN("lock inner table failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
tenant_id, tenant_id,

View File

@ -525,13 +525,15 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
LOG_INFO("data version is smaller than 4200, no need check", K(user_compat_version)); LOG_INFO("data version is smaller than 4200, no need check", K(user_compat_version));
} else { } else {
bool is_finish = false; bool is_finish = false;
ObBalanceTaskHelper ls_balance_task; ObArray<ObBalanceTaskHelper> ls_balance_tasks;
ObBalanceTaskArray balance_task_array; ObBalanceTaskArray balance_task_array;
share::ObAllTenantInfo cur_tenant_info; share::ObAllTenantInfo cur_tenant_info;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
SCN max_scn;
max_scn.set_max();
while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) { while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) {
if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_, if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(tenant_id_,
*sql_proxy_, ls_balance_task))) { *sql_proxy_, max_scn, ls_balance_tasks))) {
} else if (OB_ENTRY_NOT_EXIST == ret) { } else if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS; ret = OB_SUCCESS;
balance_task_array.reset(); balance_task_array.reset();
@ -579,7 +581,7 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_()
LOG_WARN("failed to notify recovery ls service", KR(tmp_ret)); LOG_WARN("failed to notify recovery ls service", KR(tmp_ret));
} }
usleep(100L * 1000L); usleep(100L * 1000L);
LOG_INFO("has balance task not finish", K(ls_balance_task), LOG_INFO("has balance task not finish", K(ls_balance_tasks),
K(balance_task_array), K(cur_tenant_info)); K(balance_task_array), K(cur_tenant_info));
} }
} }

View File

@ -1391,7 +1391,7 @@ int ObTenantSnapshotUtil::check_tenant_is_in_transfer_procedure_(
trans, trans,
tenant_id, tenant_id,
OB_ALL_BALANCE_JOB_TID, OB_ALL_BALANCE_JOB_TID,
EXCLUSIVE))) { EXCLUSIVE, false/*is_from_sql*/))) {
LOG_WARN("lock inner table failed", KR(ret), K(tenant_id)); LOG_WARN("lock inner table failed", KR(ret), K(tenant_id));
} else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job(
tenant_id, tenant_id,

View File

@ -163,6 +163,22 @@ bool ObBalanceTaskHelper::is_valid() const
return operation_scn_.is_valid() && balance_task_helper_meta_.is_valid() && is_valid_tenant_id(tenant_id_); return operation_scn_.is_valid() && balance_task_helper_meta_.is_valid() && is_valid_tenant_id(tenant_id_);
} }
int ObBalanceTaskHelper::assign(const ObBalanceTaskHelper &other)
{
int ret = OB_SUCCESS;
if (this != &other) {
reset();
if (OB_FAIL(balance_task_helper_meta_.assign(other.balance_task_helper_meta_))) {
LOG_WARN("failed to assign balance task helper meta", KR(ret), K(other));
} else {
operation_scn_ = other.operation_scn_;
tenant_id_ = other.tenant_id_;
}
}
return ret;
}
void ObBalanceTaskHelper::reset() void ObBalanceTaskHelper::reset()
{ {
operation_scn_.reset(); operation_scn_.reset();
@ -192,29 +208,32 @@ int ObBalanceTaskHelperTableOperator::insert_ls_balance_task(const ObBalanceTask
return ret; return ret;
} }
int ObBalanceTaskHelperTableOperator::pop_task(const uint64_t tenant_id, int ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(const uint64_t tenant_id,
ObISQLClient &client, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task) const share::SCN &max_operation_scn,
ObIArray<ObBalanceTaskHelper> &ls_balance_task)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ls_balance_task.reset(); ls_balance_task.reset();
ObSqlString sql; ObSqlString sql;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|| !max_operation_scn.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(max_operation_scn));
} else if (OB_FAIL(sql.assign_fmt("select * from %s order by operation_scn asc limit 1", } else if (OB_FAIL(sql.assign_fmt("select * from %s where operation_scn <= %lu order by operation_scn",
OB_ALL_BALANCE_TASK_HELPER_TNAME))) { OB_ALL_BALANCE_TASK_HELPER_TNAME,
LOG_WARN("failed to assign sql", KR(ret), K(sql)); max_operation_scn.get_val_for_inner_table_field()))) {
} else if (OB_FAIL(exec_get_single_row_(sql, tenant_id, client, ls_balance_task))) { LOG_WARN("failed to assign sql", KR(ret), K(max_operation_scn), K(sql));
} else if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, ls_balance_task))) {
LOG_WARN("failed to get row", KR(ret), K(sql), K(tenant_id)); LOG_WARN("failed to get row", KR(ret), K(sql), K(tenant_id));
} }
return ret; return ret;
} }
int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql, int ObBalanceTaskHelperTableOperator::exec_get_rows_(const common::ObSqlString &sql,
const uint64_t tenant_id, const uint64_t tenant_id,
ObISQLClient &client, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task) ObIArray<ObBalanceTaskHelper> &ls_balance_tasks)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) { if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) {
@ -229,14 +248,9 @@ int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlSt
} else if (OB_ISNULL(result = res.get_result())) { } else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get sql result", KR(ret)); LOG_WARN("failed to get sql result", KR(ret));
} else if (OB_FAIL(result->next())) {
if (OB_ITER_END == ret) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("empty ls_balance_task", KR(ret), K(sql));
} else {
LOG_WARN("failed to get ls_balance_task", KR(ret), K(sql));
}
} else { } else {
while(OB_SUCC(ret) && OB_SUCC(result->next())) {
ObBalanceTaskHelper task_help;
ObString task_op_str; ObString task_op_str;
uint64_t op_scn_value = 0; uint64_t op_scn_value = 0;
SCN op_scn; SCN op_scn;
@ -252,11 +266,23 @@ int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlSt
LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id)); LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id));
} else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) { } else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) {
LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value)); LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value));
} else if (OB_FAIL(ls_balance_task.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls), } else if (OB_FAIL(task_help.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls),
ObLSID(dest_ls), ls_group_id))) { ObLSID(dest_ls), ls_group_id))) {
LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id)); LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id));
} else if (OB_FAIL(ls_balance_tasks.push_back(task_help))) {
LOG_WARN("failed to push back task", KR(ret), K(task_help));
} }
}//end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (0 == ls_balance_tasks.count()) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("no balance task help", KR(ret), K(sql));
} }
} else {
LOG_WARN("failed to get cell", KR(ret));
}
}//end else
} }
} }
return ret; return ret;
@ -313,7 +339,6 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObSqlString sql; ObSqlString sql;
ObBalanceTaskHelperOp op(ObBalanceTaskHelperOp::LS_BALANCE_TASK_OP_TRANSFER_END); ObBalanceTaskHelperOp op(ObBalanceTaskHelperOp::LS_BALANCE_TASK_OP_TRANSFER_END);
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id);
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !op_scn.is_valid() if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !op_scn.is_valid()
|| !src_ls.is_valid() || !dest_ls.is_valid())) { || !src_ls.is_valid() || !dest_ls.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
@ -328,6 +353,28 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan
return ret; return ret;
} }
int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql,
const uint64_t tenant_id, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(sql));
} else {
ObArray<ObBalanceTaskHelper> task_array;
if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, task_array))) {
LOG_WARN("failed to get single row", KR(ret), K(sql), K(tenant_id));
} else if (1 != task_array.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("task array count is not expected", KR(ret), K(sql), K(task_array));
} else if (OB_FAIL(ls_balance_task.assign(task_array.at(0)))) {
LOG_WARN("failed to assign balance task", KR(ret), K(task_array));
}
}
return ret;
}
}//end of share }//end of share
}//end of ob }//end of ob

View File

@ -118,6 +118,7 @@ public:
const share::ObLSID &dest_ls, const share::ObLSID &dest_ls,
const uint64_t ls_group_id); const uint64_t ls_group_id);
bool is_valid() const; bool is_valid() const;
int assign(const ObBalanceTaskHelper &other);
TO_STRING_KV(K_(balance_task_helper_meta), K_(operation_scn)); TO_STRING_KV(K_(balance_task_helper_meta), K_(operation_scn));
share::SCN get_operation_scn() const share::SCN get_operation_scn() const
{ {
@ -161,18 +162,20 @@ public:
static int insert_ls_balance_task(const ObBalanceTaskHelper &ls_balance_task, static int insert_ls_balance_task(const ObBalanceTaskHelper &ls_balance_task,
ObISQLClient &client); ObISQLClient &client);
/** /**
* @description: get task has min operation scn * @description: get task which operation scn less than max_scn
* @param[in] tenant_id : user_tenant_id * @param[in] tenant_id : user_tenant_id
* @param[in] client : sql client or trans * @param[in] client : sql client or trans
* @param[out] ls_balance_task : ls_balance_task of min operation_scn * @param[in] max_operation_scn : max_scn
* @param[out] ls_balance_tasks : ls_balance_task's operation_scn less than max_scn
* @return : * @return :
* OB_SUCCESS : get a valid ls_balance_task * OB_SUCCESS : get valid ls_balance_task array
* OB_ENTRY_NOT_EXIST : empty * OB_ENTRY_NOT_EXIST : empty
* OTHER : fail * OTHER : fail
*/ */
static int pop_task(const uint64_t tenant_id, static int load_tasks_order_by_scn(const uint64_t tenant_id,
ObISQLClient &client, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task); const share::SCN &max_operation_scn,
ObIArray<ObBalanceTaskHelper> &ls_balance_task);
/** /**
* @description: remove task of operation_scn * @description: remove task of operation_scn
* @param[in] tenant_id : user_tenant_id * @param[in] tenant_id : user_tenant_id
@ -207,9 +210,12 @@ public:
ObISQLClient &client, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task); ObBalanceTaskHelper &ls_balance_task);
private: private:
static int exec_get_single_row_(const common::ObSqlString &sql, static int exec_get_rows_(const common::ObSqlString &sql,
const uint64_t tenant_id, const uint64_t tenant_id,
ObISQLClient &client, ObISQLClient &client,
ObIArray<ObBalanceTaskHelper> &ls_balance_tasks);
static int exec_get_single_row_(const common::ObSqlString &sql,
const uint64_t tenant_id, ObISQLClient &client,
ObBalanceTaskHelper &ls_balance_task); ObBalanceTaskHelper &ls_balance_task);
}; };
} }

View File

@ -32,7 +32,8 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans(
common::ObMySQLTransaction &trans, common::ObMySQLTransaction &trans,
const uint64_t tenant_id, const uint64_t tenant_id,
const uint64_t inner_table_id, const uint64_t inner_table_id,
const ObTableLockMode &lock_mode) const ObTableLockMode &lock_mode,
const bool is_from_sql)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObInnerSQLConnection *conn = NULL; ObInnerSQLConnection *conn = NULL;
@ -54,6 +55,7 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans(
table_lock_arg.timeout_us_ = ctx.get_timeout(); table_lock_arg.timeout_us_ = ctx.get_timeout();
table_lock_arg.table_id_ = inner_table_id; table_lock_arg.table_id_ = inner_table_id;
table_lock_arg.op_type_ = IN_TRANS_COMMON_LOCK; table_lock_arg.op_type_ = IN_TRANS_COMMON_LOCK;
table_lock_arg.is_from_sql_ = is_from_sql;
if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, table_lock_arg, conn))) { if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, table_lock_arg, conn))) {
LOG_WARN("lock table failed", KR(ret), K(table_lock_arg)); LOG_WARN("lock table failed", KR(ret), K(table_lock_arg));
} }

View File

@ -47,6 +47,7 @@ public:
* @param[in] tenant_id: tenant_id of the inner table * @param[in] tenant_id: tenant_id of the inner table
* @param[in] inner_table_id: inner table id which you want to lock * @param[in] inner_table_id: inner table id which you want to lock
* @param[in] lock_mode: table lock mode * @param[in] lock_mode: table lock mode
* @param[in] is_from_sql: is from sql table_lock can retry
* @return * @return
* - OB_SUCCESS: lock inner table successfully * - OB_SUCCESS: lock inner table successfully
* - OB_TRY_LOCK_ROW_CONFLICT: lock conflict * - OB_TRY_LOCK_ROW_CONFLICT: lock conflict
@ -56,7 +57,8 @@ public:
common::ObMySQLTransaction &trans, common::ObMySQLTransaction &trans,
const uint64_t tenant_id, const uint64_t tenant_id,
const uint64_t inner_table_id, const uint64_t inner_table_id,
const ObTableLockMode &lock_mode); const ObTableLockMode &lock_mode,
const bool is_from_sql);
}; };
class ObLSObjLockUtil class ObLSObjLockUtil