diff --git a/mittest/simple_server/test_tenant_balance_operator.cpp b/mittest/simple_server/test_tenant_balance_operator.cpp index 32c756a611..73b73b5097 100644 --- a/mittest/simple_server/test_tenant_balance_operator.cpp +++ b/mittest/simple_server/test_tenant_balance_operator.cpp @@ -576,10 +576,14 @@ TEST_F(TestBalanceOperator, ls_balance_helper) common::ObMySQLProxy &sql_proxy = get_curr_simple_server().get_sql_proxy2(); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy)); - ObBalanceTaskHelper new_task; - ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); - ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts); - ASSERT_EQ(ls_group_id, new_task.get_ls_group_id()); + ObArray new_task; + SCN op_scn; + ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts)); + ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn( + tenant_id, sql_proxy, op_scn, new_task)); + ASSERT_EQ(1, new_task.count()); + ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts); + ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id()); ts += 10000000; ObBalanceTaskHelperOp new_op(0); ls_group_id = 1001; @@ -588,14 +592,20 @@ TEST_F(TestBalanceOperator, ls_balance_helper) tenant_id, scn, new_op, src_ls_id, dest_ls_id, ls_group_id)); ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::insert_ls_balance_task(task, sql_proxy)); - ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); + ASSERT_EQ(OB_SUCCESS, op_scn.convert_from_ts(ts)); + ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn( + tenant_id, sql_proxy, op_scn, new_task)); + ASSERT_EQ(2, new_task.count()); ts = 10000000000000; - ASSERT_EQ(new_task.get_operation_scn().convert_to_ts(), ts); - ASSERT_EQ(new_task.get_ls_group_id(), 0); - ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id, new_task.get_operation_scn(), sql_proxy)); - ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::pop_task(tenant_id, sql_proxy, new_task)); + ASSERT_EQ(new_task.at(0).get_operation_scn().convert_to_ts(), ts); + ASSERT_EQ(new_task.at(0).get_ls_group_id(), 0); + ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::remove_task(tenant_id, + new_task.at(0).get_operation_scn(), sql_proxy)); + ASSERT_EQ(OB_SUCCESS, ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn( + tenant_id, sql_proxy, op_scn, new_task)); + ASSERT_EQ(1, new_task.count()); - ASSERT_EQ(ls_group_id, new_task.get_ls_group_id()); + ASSERT_EQ(ls_group_id, new_task.at(0).get_ls_group_id()); } } // namespace share } // namespace oceanbase diff --git a/src/rootserver/ob_recovery_ls_service.cpp b/src/rootserver/ob_recovery_ls_service.cpp index 37fd09f9c7..7756220ba1 100755 --- a/src/rootserver/ob_recovery_ls_service.cpp +++ b/src/rootserver/ob_recovery_ls_service.cpp @@ -1077,10 +1077,9 @@ int ObRecoveryLSService::do_standby_balance_() int ObRecoveryLSService::do_ls_balance_task_() { int ret = OB_SUCCESS; - ObBalanceTaskHelper ls_balance_task; + ObArray ls_balance_tasks; ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*); ObAllTenantInfo tenant_info; - bool has_next_task = true; if (OB_UNLIKELY(!inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", KR(ret), K(inited_)); @@ -1090,97 +1089,156 @@ int ObRecoveryLSService::do_ls_balance_task_() } else if (OB_ISNULL(tenant_info_loader)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("mtl pointer is null", KR(ret), KP(tenant_info_loader)); - } - while (OB_SUCC(ret) && has_next_task) { - ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_, - *proxy_, ls_balance_task); + } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) { + LOG_WARN("get_tenant_info failed", K(ret)); + } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn( + tenant_id_, *proxy_, tenant_info.get_standby_scn(), + ls_balance_tasks))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; - has_next_task = false; - } else if (OB_FAIL(ret)) { - LOG_WARN("failed to get balance task", KR(ret), K(tenant_id_)); - } else if (has_set_stop()) { - ret = OB_IN_STOP_STATE; - LOG_WARN("thread is in stop state", KR(ret)); - } else if (OB_FAIL(tenant_info_loader->get_tenant_info(tenant_info))) { - LOG_WARN("get_tenant_info failed", K(ret)); - } else if (tenant_info.get_standby_scn() >= ls_balance_task.get_operation_scn()) { - const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_); - //transfer_scn maybe sync_scn - SCN transfer_scn; - START_TRANSACTION(proxy_, exec_tenant_id) - if (OB_FAIL(ret)) { - LOG_WARN("failed to start trans", KR(ret)); - } else if (ls_balance_task.get_task_op().is_ls_alter()) { - if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) { - LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task)); - } - } else if (ls_balance_task.get_task_op().is_transfer_end()) { - transfer_scn = ls_balance_task.get_operation_scn(); - } else if (ls_balance_task.get_task_op().is_transfer_begin()) { - //find transfer end, or tenant is in flashback - ObBalanceTaskHelper transfer_end_task; - ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_, - ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(), - ls_balance_task.get_dest_ls(), trans, transfer_end_task); - if (OB_SUCC(ret)) { - //if has transfer end, can remove transfer begin - transfer_scn = ls_balance_task.get_operation_scn(); - LOG_INFO("has transfer end task", KR(ret), K(ls_balance_task), K(transfer_end_task)); - } else if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task)); - } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status() - || tenant_info.is_prepare_flashback_for_failover_to_primary_status()) { - //check tenant_info status and check wait readable_scn is equal to sync_scn - ret = OB_SUCCESS; - transfer_scn = tenant_info.get_sync_scn(); - if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) { - ret = OB_NEED_WAIT; - LOG_WARN("must wait repay to newest", KR(ret), K(tenant_id_), K(tenant_info), - K(ls_balance_task)); - } else { - LOG_INFO("replay to newest", K(tenant_info), K(ls_balance_task)); - } - } else { - ret = OB_NEED_RETRY; - LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task)); - } - } - if (OB_FAIL(ret)) { - } else if (ls_balance_task.get_task_op().is_transfer_begin() - || ls_balance_task.get_task_op().is_transfer_end()) { - bool is_replay_finish = false; - if (!transfer_scn.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("transfer scn is invalid", KR(ret), K(transfer_scn), K(tenant_info), K(ls_balance_task)); - } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( - tenant_id_, ls_balance_task.get_src_ls(), - ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) { - LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task), - K(tenant_info), K(transfer_scn)); - } else if (!is_replay_finish) { - ret = OB_NEED_RETRY; - LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn)); - } - } - if (FAILEDx(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_, - ls_balance_task.get_operation_scn(), trans))) { - LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task)); - } else { - LOG_INFO("task can be remove", KR(ret), K(ls_balance_task)); - } - END_TRANSACTION(trans) + LOG_INFO("no need process balance task", K(tenant_info)); } else { - if (REACH_TENANT_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s - LOG_INFO("can not remove ls balance task helper", K(ls_balance_task), K(tenant_info)); - } - has_next_task = false; + LOG_WARN("failed to load task", KR(ret), K(tenant_id_), K(tenant_info)); } - }//end while + } else { + //使用接口保证获取到的ls_balance_task都是可读点越过,可以处理的task,task是按照顺序删除的, + //这个是必须要保证的 + for (int64_t i = 0; OB_SUCC(ret) && i < ls_balance_tasks.count() && !has_set_stop(); ++i) { + const ObBalanceTaskHelper &ls_balance_task = ls_balance_tasks.at(i); + //按顺序梳理task,不能如果一个task处理失败就失败掉 + if (OB_FAIL(try_do_ls_balance_task_(ls_balance_task, tenant_info))) { + LOG_WARN("failed to ls balance task", KR(ret), K(ls_balance_task), K(tenant_info)); + } + } + } return ret; } +int ObRecoveryLSService::try_do_ls_balance_task_( + const share::ObBalanceTaskHelper &ls_balance_task, + const share::ObAllTenantInfo &tenant_info) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret), K(inited_)); + } else if (OB_UNLIKELY(!ls_balance_task.is_valid() || !tenant_info.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(ls_balance_task), K(tenant_info)); + } else if (OB_ISNULL(proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql is null", KR(ret), KP(proxy_)); + } else { + const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_); + bool can_remove = false; + START_TRANSACTION(proxy_, exec_tenant_id) + if (OB_FAIL(ret)) { + LOG_WARN("failed to start trans", KR(ret)); + } else if (ls_balance_task.get_task_op().is_ls_alter()) { + can_remove = true; + if (OB_FAIL(do_ls_balance_alter_task_(ls_balance_task, trans))) { + LOG_WARN("failed to do ls alter task", KR(ret), K(ls_balance_task)); + } + } else if (ls_balance_task.get_task_op().is_transfer_end()) { + can_remove = true; + bool is_replay_finish = true; + if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( + tenant_id_, ls_balance_task.get_src_ls(), + ls_balance_task.get_dest_ls(), ls_balance_task.get_operation_scn(), + is_replay_finish))) { + LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), + K(ls_balance_task), K(tenant_info)); + } else if (!is_replay_finish) { + ret = OB_NEED_RETRY; + LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task)); + } + } else if (ls_balance_task.get_task_op().is_transfer_begin()) { + if (OB_FAIL(check_transfer_begin_can_remove_(ls_balance_task, tenant_info, can_remove))) { + LOG_WARN("failed to check transfer begin can remove", KR(ret), + K(ls_balance_task), K(tenant_info)); + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls balance task op is unexpected", KR(ret), K(ls_balance_task)); + } + if (OB_FAIL(ret)) { + } else if (!can_remove) { + LOG_INFO("balance task helper can not remove", K(ls_balance_task), K(tenant_info)); + } else if (OB_FAIL(ObBalanceTaskHelperTableOperator::remove_task(tenant_id_, + ls_balance_task.get_operation_scn(), trans))) { + LOG_WARN("failed to remove task", KR(ret), K(tenant_id_), K(ls_balance_task)); + } else { + LOG_INFO("task can be remove", KR(ret), K(ls_balance_task)); + } + END_TRANSACTION(trans) + } + return ret; +} +int ObRecoveryLSService::check_transfer_begin_can_remove_( + const share::ObBalanceTaskHelper &ls_balance_task, + const share::ObAllTenantInfo &tenant_info, + bool &can_remove) +{ + int ret = OB_SUCCESS; + can_remove = true; + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret), K(inited_)); + } else if (OB_UNLIKELY(!ls_balance_task.is_valid() + || !ls_balance_task.get_task_op().is_transfer_begin())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(ls_balance_task)); + } else if (OB_ISNULL(proxy_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("sql can't null", K(ret), K(proxy_)); + } else { + //find transfer end, or tenant is in flashback + ObBalanceTaskHelper transfer_end_task; + SCN transfer_scn; + bool is_replay_finish = false; + ret = ObBalanceTaskHelperTableOperator::try_find_transfer_end(tenant_id_, + ls_balance_task.get_operation_scn(), ls_balance_task.get_src_ls(), + ls_balance_task.get_dest_ls(), *proxy_, transfer_end_task); + if (OB_SUCC(ret)) { + //if has transfer end, can remove transfer begin + transfer_scn = ls_balance_task.get_operation_scn(); + LOG_INFO("has transfer end task, can remove transfer begin", KR(ret), + K(ls_balance_task), K(transfer_end_task)); + } else if (OB_ENTRY_NOT_EXIST != ret) { + LOG_WARN("failed to find transfer end task", KR(ret), K(tenant_id_), K(ls_balance_task)); + } else if (tenant_info.is_prepare_flashback_for_switch_to_primary_status() + || tenant_info.is_prepare_flashback_for_failover_to_primary_status()) { + //check tenant_info status and check wait readable_scn is equal to sync_scn + ret = OB_SUCCESS; + transfer_scn = tenant_info.get_sync_scn(); + if (tenant_info.get_sync_scn() != tenant_info.get_standby_scn()) { + can_remove = false; + LOG_WARN("There are transfer tasks in progress. Must wait for replay to newest", +KR(ret), K(tenant_id_), K(tenant_info), K(ls_balance_task)); + } else { + LOG_INFO("replay to newest, can remove transfer begin before switchover/failover to primary", + K(tenant_info), K(ls_balance_task)); + } + } else { + can_remove = false; + LOG_WARN("can not find transfer end task, can not end transfer begin task", KR(ret), K(tenant_info), K(ls_balance_task)); + ret = OB_SUCCESS; + } + if (OB_FAIL(ret) || !can_remove) { + } else if (OB_FAIL(ObLSServiceHelper::check_transfer_task_replay( + tenant_id_, ls_balance_task.get_src_ls(), + ls_balance_task.get_dest_ls(), transfer_scn, is_replay_finish))) { + LOG_WARN("failed to check transfer task replay", KR(ret), K(tenant_id_), K(ls_balance_task), + K(tenant_info), K(transfer_scn)); + } else if (!is_replay_finish) { + ret = OB_NEED_RETRY; + LOG_WARN("can not remove ls balance task helper", KR(ret), K(ls_balance_task), K(transfer_scn)); + } + + } + return ret; +} int ObRecoveryLSService::do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, common::ObMySQLTransaction &trans) diff --git a/src/rootserver/ob_recovery_ls_service.h b/src/rootserver/ob_recovery_ls_service.h index 0b2ec7a29e..27d2b72b46 100755 --- a/src/rootserver/ob_recovery_ls_service.h +++ b/src/rootserver/ob_recovery_ls_service.h @@ -136,6 +136,11 @@ private: //thread1 int do_standby_balance_(); int do_ls_balance_task_(); + int try_do_ls_balance_task_(const share::ObBalanceTaskHelper &ls_balance_task, + const share::ObAllTenantInfo &tenant_info); + int check_transfer_begin_can_remove_(const share::ObBalanceTaskHelper &ls_balance_task, + const share::ObAllTenantInfo &tenant_info, + bool &can_remove); int do_ls_balance_alter_task_(const share::ObBalanceTaskHelper &ls_balance_task, common::ObMySQLTransaction &trans); int reset_restore_proxy_(ObRestoreSourceServiceAttr &service_attr); diff --git a/src/rootserver/ob_tenant_balance_service.cpp b/src/rootserver/ob_tenant_balance_service.cpp index e6af38ffdf..e307865a17 100755 --- a/src/rootserver/ob_tenant_balance_service.cpp +++ b/src/rootserver/ob_tenant_balance_service.cpp @@ -691,7 +691,7 @@ int ObTenantBalanceService::lock_and_check_balance_job_( trans, tenant_id, OB_ALL_BALANCE_JOB_TID, - EXCLUSIVE))) { + EXCLUSIVE, false))) { LOG_WARN("lock inner table failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( tenant_id, diff --git a/src/rootserver/ob_tenant_role_transition_service.cpp b/src/rootserver/ob_tenant_role_transition_service.cpp index 91643b4130..f926fc6f47 100644 --- a/src/rootserver/ob_tenant_role_transition_service.cpp +++ b/src/rootserver/ob_tenant_role_transition_service.cpp @@ -525,13 +525,15 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_() LOG_INFO("data version is smaller than 4200, no need check", K(user_compat_version)); } else { bool is_finish = false; - ObBalanceTaskHelper ls_balance_task; + ObArray ls_balance_tasks; ObBalanceTaskArray balance_task_array; share::ObAllTenantInfo cur_tenant_info; int tmp_ret = OB_SUCCESS; + SCN max_scn; + max_scn.set_max(); while (!THIS_WORKER.is_timeout() && OB_SUCC(ret) && !is_finish) { - if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::pop_task(tenant_id_, - *sql_proxy_, ls_balance_task))) { + if (FALSE_IT(ret = ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(tenant_id_, + *sql_proxy_, max_scn, ls_balance_tasks))) { } else if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; balance_task_array.reset(); @@ -579,7 +581,7 @@ int ObTenantRoleTransitionService::wait_ls_balance_task_finish_() LOG_WARN("failed to notify recovery ls service", KR(tmp_ret)); } usleep(100L * 1000L); - LOG_INFO("has balance task not finish", K(ls_balance_task), + LOG_INFO("has balance task not finish", K(ls_balance_tasks), K(balance_task_array), K(cur_tenant_info)); } } diff --git a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp index 44b69bb65b..c1bd1df9e8 100644 --- a/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp +++ b/src/rootserver/tenant_snapshot/ob_tenant_snapshot_util.cpp @@ -1391,7 +1391,7 @@ int ObTenantSnapshotUtil::check_tenant_is_in_transfer_procedure_( trans, tenant_id, OB_ALL_BALANCE_JOB_TID, - EXCLUSIVE))) { + EXCLUSIVE, false/*is_from_sql*/))) { LOG_WARN("lock inner table failed", KR(ret), K(tenant_id)); } else if (OB_FAIL(ObBalanceJobTableOperator::get_balance_job( tenant_id, diff --git a/src/share/balance/ob_balance_task_helper_operator.cpp b/src/share/balance/ob_balance_task_helper_operator.cpp index e297270da2..110092dbfc 100644 --- a/src/share/balance/ob_balance_task_helper_operator.cpp +++ b/src/share/balance/ob_balance_task_helper_operator.cpp @@ -163,6 +163,22 @@ bool ObBalanceTaskHelper::is_valid() const return operation_scn_.is_valid() && balance_task_helper_meta_.is_valid() && is_valid_tenant_id(tenant_id_); } +int ObBalanceTaskHelper::assign(const ObBalanceTaskHelper &other) +{ + int ret = OB_SUCCESS; + if (this != &other) { + reset(); + if (OB_FAIL(balance_task_helper_meta_.assign(other.balance_task_helper_meta_))) { + LOG_WARN("failed to assign balance task helper meta", KR(ret), K(other)); + } else { + operation_scn_ = other.operation_scn_; + tenant_id_ = other.tenant_id_; + } + } + return ret; +} + + void ObBalanceTaskHelper::reset() { operation_scn_.reset(); @@ -192,29 +208,32 @@ int ObBalanceTaskHelperTableOperator::insert_ls_balance_task(const ObBalanceTask return ret; } -int ObBalanceTaskHelperTableOperator::pop_task(const uint64_t tenant_id, +int ObBalanceTaskHelperTableOperator::load_tasks_order_by_scn(const uint64_t tenant_id, ObISQLClient &client, - ObBalanceTaskHelper &ls_balance_task) + const share::SCN &max_operation_scn, + ObIArray &ls_balance_task) { int ret = OB_SUCCESS; ls_balance_task.reset(); ObSqlString sql; - if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id + || !max_operation_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant id", KR(ret), K(tenant_id)); - } else if (OB_FAIL(sql.assign_fmt("select * from %s order by operation_scn asc limit 1", - OB_ALL_BALANCE_TASK_HELPER_TNAME))) { - LOG_WARN("failed to assign sql", KR(ret), K(sql)); - } else if (OB_FAIL(exec_get_single_row_(sql, tenant_id, client, ls_balance_task))) { + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(max_operation_scn)); + } else if (OB_FAIL(sql.assign_fmt("select * from %s where operation_scn <= %lu order by operation_scn", + OB_ALL_BALANCE_TASK_HELPER_TNAME, + max_operation_scn.get_val_for_inner_table_field()))) { + LOG_WARN("failed to assign sql", KR(ret), K(max_operation_scn), K(sql)); + } else if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, ls_balance_task))) { LOG_WARN("failed to get row", KR(ret), K(sql), K(tenant_id)); } return ret; } -int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql, +int ObBalanceTaskHelperTableOperator::exec_get_rows_(const common::ObSqlString &sql, const uint64_t tenant_id, ObISQLClient &client, - ObBalanceTaskHelper &ls_balance_task) + ObIArray &ls_balance_tasks) { int ret = OB_SUCCESS; if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) { @@ -229,34 +248,41 @@ int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlSt } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("failed to get sql result", KR(ret)); - } else if (OB_FAIL(result->next())) { - if (OB_ITER_END == ret) { - ret = OB_ENTRY_NOT_EXIST; - LOG_WARN("empty ls_balance_task", KR(ret), K(sql)); - } else { - LOG_WARN("failed to get ls_balance_task", KR(ret), K(sql)); - } } else { - ObString task_op_str; - uint64_t op_scn_value = 0; - SCN op_scn; - int64_t src_ls = 0; - int64_t dest_ls = 0; - uint64_t ls_group_id = OB_INVALID_ID; - EXTRACT_UINT_FIELD_MYSQL(*result, "operation_scn", op_scn_value, uint64_t); - EXTRACT_INT_FIELD_MYSQL(*result, "src_ls", src_ls, int64_t); - EXTRACT_INT_FIELD_MYSQL(*result, "dest_ls", dest_ls, int64_t); - EXTRACT_INT_FIELD_MYSQL(*result, "ls_group_id", ls_group_id, uint64_t); - EXTRACT_VARCHAR_FIELD_MYSQL(*result, "operation_type", task_op_str); - if (OB_FAIL(ret)) { - LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id)); - } else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) { - LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value)); - } else if (OB_FAIL(ls_balance_task.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls), - ObLSID(dest_ls), ls_group_id))) { - LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id)); + while(OB_SUCC(ret) && OB_SUCC(result->next())) { + ObBalanceTaskHelper task_help; + ObString task_op_str; + uint64_t op_scn_value = 0; + SCN op_scn; + int64_t src_ls = 0; + int64_t dest_ls = 0; + uint64_t ls_group_id = OB_INVALID_ID; + EXTRACT_UINT_FIELD_MYSQL(*result, "operation_scn", op_scn_value, uint64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "src_ls", src_ls, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "dest_ls", dest_ls, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "ls_group_id", ls_group_id, uint64_t); + EXTRACT_VARCHAR_FIELD_MYSQL(*result, "operation_type", task_op_str); + if (OB_FAIL(ret)) { + LOG_WARN("failed to get cell", KR(ret), K(task_op_str), K(op_scn), K(src_ls), K(dest_ls), K(ls_group_id)); + } else if (OB_FAIL(op_scn.convert_for_inner_table_field(op_scn_value))) { + LOG_WARN("failed to convert for inner table", KR(ret), K(op_scn_value)); + } else if (OB_FAIL(task_help.init(tenant_id, op_scn, ObBalanceTaskHelperOp(task_op_str), ObLSID(src_ls), + ObLSID(dest_ls), ls_group_id))) { + LOG_WARN("failed to init ls_balance_task", KR(ret), K(tenant_id), K(task_op_str), K(op_scn), K(src_ls), K(ls_group_id)); + } else if (OB_FAIL(ls_balance_tasks.push_back(task_help))) { + LOG_WARN("failed to push back task", KR(ret), K(task_help)); + } + }//end while + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + if (0 == ls_balance_tasks.count()) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("no balance task help", KR(ret), K(sql)); + } + } else { + LOG_WARN("failed to get cell", KR(ret)); } - } + }//end else } } return ret; @@ -313,7 +339,6 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan int ret = OB_SUCCESS; ObSqlString sql; ObBalanceTaskHelperOp op(ObBalanceTaskHelperOp::LS_BALANCE_TASK_OP_TRANSFER_END); - const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || !op_scn.is_valid() || !src_ls.is_valid() || !dest_ls.is_valid())) { ret = OB_INVALID_ARGUMENT; @@ -328,6 +353,28 @@ int ObBalanceTaskHelperTableOperator::try_find_transfer_end(const uint64_t tenan return ret; } +int ObBalanceTaskHelperTableOperator::exec_get_single_row_(const common::ObSqlString &sql, + const uint64_t tenant_id, ObISQLClient &client, + ObBalanceTaskHelper &ls_balance_task) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || sql.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(sql)); + } else { + ObArray task_array; + if (OB_FAIL(exec_get_rows_(sql, tenant_id, client, task_array))) { + LOG_WARN("failed to get single row", KR(ret), K(sql), K(tenant_id)); + } else if (1 != task_array.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task array count is not expected", KR(ret), K(sql), K(task_array)); + } else if (OB_FAIL(ls_balance_task.assign(task_array.at(0)))) { + LOG_WARN("failed to assign balance task", KR(ret), K(task_array)); + } + + } + return ret; +} }//end of share }//end of ob diff --git a/src/share/balance/ob_balance_task_helper_operator.h b/src/share/balance/ob_balance_task_helper_operator.h index 70f661bbb7..69a57fc634 100644 --- a/src/share/balance/ob_balance_task_helper_operator.h +++ b/src/share/balance/ob_balance_task_helper_operator.h @@ -118,6 +118,7 @@ public: const share::ObLSID &dest_ls, const uint64_t ls_group_id); bool is_valid() const; + int assign(const ObBalanceTaskHelper &other); TO_STRING_KV(K_(balance_task_helper_meta), K_(operation_scn)); share::SCN get_operation_scn() const { @@ -161,18 +162,20 @@ public: static int insert_ls_balance_task(const ObBalanceTaskHelper &ls_balance_task, ObISQLClient &client); /** - * @description: get task has min operation scn + * @description: get task which operation scn less than max_scn * @param[in] tenant_id : user_tenant_id * @param[in] client : sql client or trans - * @param[out] ls_balance_task : ls_balance_task of min operation_scn + * @param[in] max_operation_scn : max_scn + * @param[out] ls_balance_tasks : ls_balance_task's operation_scn less than max_scn * @return : - * OB_SUCCESS : get a valid ls_balance_task + * OB_SUCCESS : get valid ls_balance_task array * OB_ENTRY_NOT_EXIST : empty * OTHER : fail */ - static int pop_task(const uint64_t tenant_id, + static int load_tasks_order_by_scn(const uint64_t tenant_id, ObISQLClient &client, - ObBalanceTaskHelper &ls_balance_task); + const share::SCN &max_operation_scn, + ObIArray &ls_balance_task); /** * @description: remove task of operation_scn * @param[in] tenant_id : user_tenant_id @@ -207,10 +210,13 @@ public: ObISQLClient &client, ObBalanceTaskHelper &ls_balance_task); private: - static int exec_get_single_row_(const common::ObSqlString &sql, + static int exec_get_rows_(const common::ObSqlString &sql, const uint64_t tenant_id, ObISQLClient &client, - ObBalanceTaskHelper &ls_balance_task); + ObIArray &ls_balance_tasks); + static int exec_get_single_row_(const common::ObSqlString &sql, + const uint64_t tenant_id, ObISQLClient &client, + ObBalanceTaskHelper &ls_balance_task); }; } } diff --git a/src/storage/tablelock/ob_lock_utils.cpp b/src/storage/tablelock/ob_lock_utils.cpp index 12351c203c..69969e80e8 100644 --- a/src/storage/tablelock/ob_lock_utils.cpp +++ b/src/storage/tablelock/ob_lock_utils.cpp @@ -32,7 +32,8 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans( common::ObMySQLTransaction &trans, const uint64_t tenant_id, const uint64_t inner_table_id, - const ObTableLockMode &lock_mode) + const ObTableLockMode &lock_mode, + const bool is_from_sql) { int ret = OB_SUCCESS; ObInnerSQLConnection *conn = NULL; @@ -54,6 +55,7 @@ int ObInnerTableLockUtil::lock_inner_table_in_trans( table_lock_arg.timeout_us_ = ctx.get_timeout(); table_lock_arg.table_id_ = inner_table_id; table_lock_arg.op_type_ = IN_TRANS_COMMON_LOCK; + table_lock_arg.is_from_sql_ = is_from_sql; if (OB_FAIL(ObInnerConnectionLockUtil::lock_table(tenant_id, table_lock_arg, conn))) { LOG_WARN("lock table failed", KR(ret), K(table_lock_arg)); } diff --git a/src/storage/tablelock/ob_lock_utils.h b/src/storage/tablelock/ob_lock_utils.h index 6ea9c3b8c6..6f37de7782 100644 --- a/src/storage/tablelock/ob_lock_utils.h +++ b/src/storage/tablelock/ob_lock_utils.h @@ -47,6 +47,7 @@ public: * @param[in] tenant_id: tenant_id of the inner table * @param[in] inner_table_id: inner table id which you want to lock * @param[in] lock_mode: table lock mode + * @param[in] is_from_sql: is from sql table_lock can retry * @return * - OB_SUCCESS: lock inner table successfully * - OB_TRY_LOCK_ROW_CONFLICT: lock conflict @@ -56,7 +57,8 @@ public: common::ObMySQLTransaction &trans, const uint64_t tenant_id, const uint64_t inner_table_id, - const ObTableLockMode &lock_mode); + const ObTableLockMode &lock_mode, + const bool is_from_sql); }; class ObLSObjLockUtil