diff --git a/src/logservice/ob_garbage_collector.cpp b/src/logservice/ob_garbage_collector.cpp index 03c0df57fc..161709d7bc 100644 --- a/src/logservice/ob_garbage_collector.cpp +++ b/src/logservice/ob_garbage_collector.cpp @@ -119,6 +119,7 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::remove_self_from_learnerlis .by(MTL_ID()) .timeout(TIMEOUT_US) .max_process_handler_time(TIMEOUT_US) + .group_id(share::OBCG_STORAGE) .get_palf_stat(get_palf_stat_req, get_palf_stat_resp))) { CLOG_LOG(WARN, "get_palf_stat failed", K(ls_id), K(leader), K(get_palf_stat_req)); } else { @@ -160,6 +161,7 @@ int ObGarbageCollector::QueryLSIsValidMemberFunctor::handle_ls_array_(const ObAd if (OB_SUCCESS != (tmp_ret = rpc_proxy_->to(leader) .by(MTL_ID()) .timeout(TIMEOUT) + .group_id(share::OBCG_STORAGE) .query_ls_is_valid_member(request, response)) || (OB_SUCCESS != (tmp_ret = response.ret_value_))) { CLOG_LOG(WARN, "query_is_valid_member failed", K(tmp_ret), K(leader), K(request)); @@ -1364,7 +1366,7 @@ int ObGarbageCollector::get_ls_status_from_table(const ObLSID &ls_id, // sys tenant should always return LS_NORMAL if (OB_SYS_TENANT_ID == tenant_id) { ls_status = OB_LS_NORMAL; - } else if (OB_FAIL(ls_op.get_ls_status_info(tenant_id, ls_id, status_info, *sql_proxy_))) { + } else if (OB_FAIL(ls_op.get_ls_status_info(tenant_id, ls_id, status_info, *sql_proxy_, share::OBCG_STORAGE))) { CLOG_LOG(INFO, "failed to get ls status info from table", K(ret), K(tenant_id), K(ls_id)); } else { ls_status = status_info.status_; diff --git a/src/rootserver/ob_balance_group_ls_stat_operator.cpp b/src/rootserver/ob_balance_group_ls_stat_operator.cpp index 5169db1f81..6a93522438 100755 --- a/src/rootserver/ob_balance_group_ls_stat_operator.cpp +++ b/src/rootserver/ob_balance_group_ls_stat_operator.cpp @@ -1669,7 +1669,8 @@ int ObNewTableTabletAllocator::alloc_ls_for_duplicate_table_( } else if (OB_TMP_FAIL(ls_status_operator.get_duplicate_ls_status_info( tenant_id, *GCTX.sql_proxy_, - duplicate_ls_status_info))) { + duplicate_ls_status_info, + share::OBCG_DEFAULT/*group_id*/))) { if (OB_ENTRY_NOT_EXIST == tmp_ret) { LOG_INFO("duplicate log stream not exist, should create one duplicate log stream"); tmp_ret = OB_SUCCESS; diff --git a/src/rootserver/ob_tenant_thread_helper.cpp b/src/rootserver/ob_tenant_thread_helper.cpp index 5fbe6382e3..92c79bead3 100755 --- a/src/rootserver/ob_tenant_thread_helper.cpp +++ b/src/rootserver/ob_tenant_thread_helper.cpp @@ -314,7 +314,7 @@ int ObTenantThreadHelper::check_can_do_recovery_(const uint64_t tenant_id) if (OB_ISNULL(GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql can't null", K(ret), K(GCTX.sql_proxy_)); - } else if (OB_FAIL(restore_table_operator.init(GCTX.sql_proxy_, tenant_id))) { + } else if (OB_FAIL(restore_table_operator.init(GCTX.sql_proxy_, tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail to init restore table operator", KR(ret), K(tenant_id)); } else if (OB_FAIL(restore_table_operator.get_job_by_tenant_id(tenant_id, job_info))) { diff --git a/src/rootserver/restore/ob_recover_table_job_scheduler.cpp b/src/rootserver/restore/ob_recover_table_job_scheduler.cpp index 99e04ae8fa..947b3b601f 100644 --- a/src/rootserver/restore/ob_recover_table_job_scheduler.cpp +++ b/src/rootserver/restore/ob_recover_table_job_scheduler.cpp @@ -549,7 +549,7 @@ int ObRecoverTableJobScheduler::restore_aux_tenant_(share::ObRecoverTableJob &jo bool aux_tenant_restore_finish = true; int tmp_ret = OB_SUCCESS; DEBUG_SYNC(BEFORE_RESTORE_AUX_TENANT); - if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID))) { + if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init retore helper", K(ret)); } else if (OB_FAIL(restore_helper.get_restore_job_history( *sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) { @@ -594,7 +594,7 @@ int ObRecoverTableJobScheduler::active_aux_tenant_(share::ObRecoverTableJob &job int tmp_ret = OB_SUCCESS; ObRestorePersistHelper restore_helper; ObHisRestoreJobPersistInfo restore_history_info; - if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID))) { + if (OB_FAIL(restore_helper.init(OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init retore helper", K(ret)); } else if (OB_FAIL(restore_helper.get_restore_job_history( *sql_proxy_, job.get_initiator_job_id(), job.get_initiator_tenant_id(), restore_history_info))) { diff --git a/src/rootserver/restore/ob_restore_scheduler.cpp b/src/rootserver/restore/ob_restore_scheduler.cpp index 7643ed1752..faf39aa33a 100644 --- a/src/rootserver/restore/ob_restore_scheduler.cpp +++ b/src/rootserver/restore/ob_restore_scheduler.cpp @@ -102,7 +102,7 @@ void ObRestoreScheduler::do_work() LOG_INFO("[RESTORE] try process restore job"); ObArray job_infos; ObPhysicalRestoreTableOperator restore_op; - if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_))) { + if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init", K(ret), K(tenant_id_)); } else if (OB_FAIL(restore_op.get_jobs(job_infos))) { LOG_WARN("fail to get jobs", KR(ret), K(tenant_id_)); @@ -241,7 +241,7 @@ int ObRestoreScheduler::restore_tenant(const ObPhysicalRestoreJob &job_info) ObPhysicalRestoreTableOperator restore_op; const int64_t job_id = job_info.get_job_id(); const uint64_t new_tenant_id = tenant_id; - if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_))) { + if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init", K(ret), K(tenant_id_)); } else if (OB_FAIL(restore_op.update_restore_option( job_id, "tenant_id", new_tenant_id))) { @@ -458,7 +458,7 @@ int ObRestoreScheduler::fill_restore_statistics(const share::ObPhysicalRestoreJo } if (OB_SUCC(ret)) { share::ObRestorePersistHelper helper; - if (OB_FAIL(helper.init(job_info.get_tenant_id()))) { + if (OB_FAIL(helper.init(job_info.get_tenant_id(), share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail to init heler", K(ret)); } else if (OB_FAIL(helper.insert_initial_restore_progress(*sql_proxy_, restore_progress_info))) { LOG_WARN("fail to insert initail ls restore progress", K(ret)); @@ -804,7 +804,7 @@ int ObRestoreScheduler::try_update_job_status( LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(restore_service_->check_stop())) { LOG_WARN("restore scheduler stopped", K(ret)); - } else if (OB_FAIL(restore_op.init(&sql_client, tenant_id_))) { + } else if (OB_FAIL(restore_op.init(&sql_client, tenant_id_, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init", K(ret), K(tenant_id_)); } else { PhysicalRestoreStatus next_status = get_next_status(return_ret, job.get_status()); @@ -1334,7 +1334,7 @@ int ObRestoreScheduler::check_all_ls_restore_to_consistent_scn_finish_( bool is_finished = false; bool is_success = false; ObPhysicalRestoreTableOperator restore_op; - if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id))) { + if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id, share::OBCG_STORAGE/*group_id*/))) { LOG_WARN("fail init", K(ret), K(tenant_id_)); } else if (OB_FAIL(restore_op.check_finish_restore_to_consistent_scn(is_finished, is_success))) { LOG_WARN("fail to check finish restore to consistent_scn", K(ret), K(tenant_id)); @@ -1368,7 +1368,7 @@ int ObRestoreScheduler::restore_wait_tenant_finish(const share::ObPhysicalRestor LOG_WARN("not inited", KR(ret)); } else if (OB_FAIL(restore_service_->check_stop())) { LOG_WARN("restore scheduler stopped", KR(ret)); - } else if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_))) { + } else if (OB_FAIL(restore_op.init(sql_proxy_, tenant_id_, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init", K(ret), K(tenant_id_)); } else if (OB_FAIL(ObRestoreUtil::get_user_restore_job_history( *sql_proxy_, job_info.get_tenant_id(), diff --git a/src/rootserver/restore/ob_restore_util.cpp b/src/rootserver/restore/ob_restore_util.cpp index b81c932e88..e99ea9376e 100644 --- a/src/rootserver/restore/ob_restore_util.cpp +++ b/src/rootserver/restore/ob_restore_util.cpp @@ -120,7 +120,7 @@ int ObRestoreUtil::record_physical_restore_job( } else if (has_job) { ret = OB_RESTORE_IN_PROGRESS; LOG_WARN("restore tenant job already exist", K(ret), K(job)); - } else if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID))) { + } else if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init restore op", K(ret)); } else if (OB_FAIL(restore_op.insert_job(job))) { LOG_WARN("fail insert job and partitions", K(ret), K(job)); @@ -142,7 +142,7 @@ int ObRestoreUtil::insert_user_tenant_restore_job( ObPhysicalRestoreTableOperator restore_op; ObPhysicalRestoreJob initaitor_job_info; ObPhysicalRestoreJob job_info; - if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID))) { + if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id)); } else if (OB_FAIL(restore_op.get_job_by_tenant_name( tenant_name, initaitor_job_info))) { @@ -167,9 +167,9 @@ int ObRestoreUtil::insert_user_tenant_restore_job( const uint64_t exec_tenant_id = gen_meta_tenant_id(user_tenant_id); if (OB_FAIL(trans.start(&sql_client, exec_tenant_id))) { LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id)); - } else if (OB_FAIL(user_restore_op.init(&trans, user_tenant_id))) { + } else if (OB_FAIL(user_restore_op.init(&trans, user_tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init restore op", KR(ret), K(user_tenant_id)); - } else if (OB_FAIL(restore_persist_op.init(user_tenant_id))) { + } else if (OB_FAIL(restore_persist_op.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init restore persist op", KR(ret), K(user_tenant_id)); } else if (OB_FAIL(user_restore_op.insert_job(job_info))) { LOG_WARN("failed to insert job", KR(ret), K(job_info)); @@ -199,7 +199,7 @@ int ObRestoreUtil::check_has_physical_restore_job( ObArray jobs; has_job = false; ObPhysicalRestoreTableOperator restore_op; - if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID))) { + if (OB_FAIL(restore_op.init(&sql_client, OB_SYS_TENANT_ID, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("fail init restore op", K(ret)); } else if (OB_FAIL(restore_op.get_jobs(jobs))) { LOG_WARN("fail get jobs", K(ret)); @@ -828,7 +828,7 @@ int ObRestoreUtil::recycle_restore_job(const uint64_t tenant_id, LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id)); } else { ObPhysicalRestoreTableOperator restore_op; - if (OB_FAIL(restore_op.init(&trans, tenant_id))) { + if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init restore op", KR(ret), K(tenant_id)); } else if (OB_FAIL(restore_op.remove_job(job_id))) { LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id)); @@ -840,7 +840,7 @@ int ObRestoreUtil::recycle_restore_job(const uint64_t tenant_id, common::ObArray ls_restore_progress_infos; key.tenant_id_ = tenant_id; key.job_id_ = job_info.get_job_id(); - if (OB_FAIL(persist_helper.init(tenant_id))) { + if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init persist helper", KR(ret), K(tenant_id)); } else if (OB_FAIL(persist_helper.get_restore_process( trans, key, restore_progress))) { @@ -900,13 +900,13 @@ int ObRestoreUtil::recycle_restore_job(common::ObMySQLProxy &sql_proxy, LOG_WARN("invalid argument", KR(ret), K(exec_tenant_id)); } else if (OB_FAIL(trans.start(&sql_proxy, exec_tenant_id))) { LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id)); - } else if (OB_FAIL(persist_helper.init(tenant_id))) { + } else if (OB_FAIL(persist_helper.init(tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init persist helper", KR(ret)); } else if (OB_FAIL(persist_helper.insert_restore_job_history(trans, history_info))) { LOG_WARN("failed to insert restore job history", KR(ret), K(history_info)); } else { ObPhysicalRestoreTableOperator restore_op; - if (OB_FAIL(restore_op.init(&trans, tenant_id))) { + if (OB_FAIL(restore_op.init(&trans, tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init restore op", KR(ret), K(tenant_id)); } else if (OB_FAIL(restore_op.remove_job(job_id))) { LOG_WARN("failed to remove job", KR(ret), K(tenant_id), K(job_id)); @@ -942,7 +942,7 @@ int ObRestoreUtil::get_user_restore_job_history(common::ObISQLClient &sql_client K(initiator_job_id), K(initiator_tenant_id)); } else { ObRestorePersistHelper user_persist_helper; - if (OB_FAIL(user_persist_helper.init(user_tenant_id))) { + if (OB_FAIL(user_persist_helper.init(user_tenant_id, share::OBCG_STORAGE /*group_id*/))) { LOG_WARN("failed to init persist helper", KR(ret), K(user_tenant_id)); } else if (OB_FAIL(user_persist_helper.get_restore_job_history( sql_client, initiator_job_id, initiator_tenant_id, diff --git a/src/share/ls/ob_ls_status_operator.cpp b/src/share/ls/ob_ls_status_operator.cpp index 6ce5c84f9b..940100d650 100644 --- a/src/share/ls/ob_ls_status_operator.cpp +++ b/src/share/ls/ob_ls_status_operator.cpp @@ -614,15 +614,16 @@ int ObLSStatusOperator::get_ls_init_member_list( ret = OB_INVALID_ARGUMENT; LOG_WARN("tenant id is invalid", KR(ret), K(tenant_id)); } else if (OB_FAIL(get_ls_status_(tenant_id, id, true /*need_member_list*/, - member_list, status_info, client, arb_member, learner_list))) { + member_list, status_info, client, arb_member, learner_list, share::OBCG_DEFAULT))) { LOG_WARN("failed to get ls status", KR(ret), K(id), K(tenant_id)); } return ret; } int ObLSStatusOperator::get_ls_status_info( - const uint64_t tenant_id, - const ObLSID &id, ObLSStatusInfo &status_info, ObISQLClient &client) + const uint64_t tenant_id, + const ObLSID &id, ObLSStatusInfo &status_info, ObISQLClient &client, + const int32_t group_id) { int ret = OB_SUCCESS; ObMemberList member_list; @@ -633,7 +634,7 @@ int ObLSStatusOperator::get_ls_status_info( ret = OB_INVALID_ARGUMENT; LOG_WARN("tenant id is invalid", KR(ret), K(tenant_id)); } else if (OB_FAIL(get_ls_status_(tenant_id, id, false /*need_member_list*/, - member_list, status_info, client, arb_member, learner_list))) { + member_list, status_info, client, arb_member, learner_list, group_id))) { LOG_WARN("failed to get ls status", KR(ret), K(id), K(tenant_id)); } return ret; @@ -642,7 +643,8 @@ int ObLSStatusOperator::get_ls_status_info( int ObLSStatusOperator::get_duplicate_ls_status_info( const uint64_t tenant_id, ObISQLClient &client, - share::ObLSStatusInfo &status_info) + share::ObLSStatusInfo &status_info, + const int32_t group_id) { int ret = OB_SUCCESS; status_info.reset(); @@ -665,7 +667,7 @@ int ObLSStatusOperator::get_duplicate_ls_status_info( flag_str.ptr()))) { LOG_WARN("failed to assign sql", KR(ret), K(sql)); } else if (OB_FAIL(inner_get_ls_status_(sql, get_exec_tenant_id(tenant_id), need_member_list, - client, member_list, status_info, arb_member, learner_list))) { + client, member_list, status_info, arb_member, learner_list, group_id))) { LOG_WARN("fail to inner get ls status info", KR(ret), K(sql), K(tenant_id), "exec_tenant_id", get_exec_tenant_id(tenant_id), K(need_member_list)); } @@ -918,16 +920,17 @@ int ObLSStatusOperator::inner_get_ls_status_( ObMemberList &member_list, share::ObLSStatusInfo &status_info, ObMember &arb_member, - common::GlobalLearnerList &learner_list) + common::GlobalLearnerList &learner_list, + const int32_t group_id) { int ret = OB_SUCCESS; member_list.reset(); status_info.reset(); learner_list.reset(); arb_member.reset(); - if (OB_UNLIKELY(sql.empty() || OB_INVALID_TENANT_ID == exec_tenant_id)) { + if (OB_UNLIKELY(sql.empty() || OB_INVALID_TENANT_ID == exec_tenant_id || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(sql), K(exec_tenant_id)); + LOG_WARN("invalid argument", KR(ret), K(sql), K(exec_tenant_id), K(group_id)); } else { ObTimeoutCtx ctx; const int64_t default_timeout = GCONF.internal_sql_execute_timeout; @@ -936,7 +939,7 @@ int ObLSStatusOperator::inner_get_ls_status_( } else { HEAP_VAR(ObMySQLProxy::MySQLResult, res) { common::sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(client.read(res, exec_tenant_id, sql.ptr()))) { + if (OB_FAIL(client.read(res, exec_tenant_id, sql.ptr(), group_id))) { LOG_WARN("failed to read", KR(ret), K(exec_tenant_id), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -1000,7 +1003,8 @@ int ObLSStatusOperator::get_ls_status_(const uint64_t tenant_id, share::ObLSStatusInfo &status_info, ObISQLClient &client, ObMember &arb_member, - common::GlobalLearnerList &learner_list) + common::GlobalLearnerList &learner_list, + const int32_t group_id) { int ret = OB_SUCCESS; member_list.reset(); @@ -1008,14 +1012,15 @@ int ObLSStatusOperator::get_ls_status_(const uint64_t tenant_id, status_info.reset(); ObSqlString sql; if (OB_UNLIKELY(!id.is_valid() - || OB_INVALID_TENANT_ID == tenant_id)) { + || OB_INVALID_TENANT_ID == tenant_id + || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(id), K(tenant_id)); + LOG_WARN("invalid argument", KR(ret), K(id), K(tenant_id), K(group_id)); } else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s where ls_id = %ld and tenant_id = %lu", OB_ALL_LS_STATUS_TNAME, id.id(), tenant_id))) { LOG_WARN("failed to assign sql", KR(ret), K(sql)); } else if (OB_FAIL(inner_get_ls_status_(sql, get_exec_tenant_id(tenant_id), need_member_list, - client, member_list, status_info, arb_member, learner_list))) { + client, member_list, status_info, arb_member, learner_list, group_id))) { LOG_WARN("fail to inner get ls status info", KR(ret), K(sql), K(tenant_id), "exec_tenant_id", get_exec_tenant_id(tenant_id), K(need_member_list)); } diff --git a/src/share/ls/ob_ls_status_operator.h b/src/share/ls/ob_ls_status_operator.h index b989859b6c..0026a4a44b 100644 --- a/src/share/ls/ob_ls_status_operator.h +++ b/src/share/ls/ob_ls_status_operator.h @@ -365,7 +365,8 @@ public: // @return OB_ENTRY_NOT_EXIST if duplicate log stream not exist int get_duplicate_ls_status_info(const uint64_t tenant_id, ObISQLClient &client, - share::ObLSStatusInfo &status_info); + share::ObLSStatusInfo &status_info, + const int32_t group_id); /** * @description: * get ls list from all_ls_status order by tenant_id, ls_id for switchover tenant @@ -398,7 +399,8 @@ public: ObMember &arb_member, common::GlobalLearnerList &learner_list); int get_ls_status_info(const uint64_t tenant_id, const ObLSID &id, - ObLSStatusInfo &status_info, ObISQLClient &client); + ObLSStatusInfo &status_info, ObISQLClient &client, + const int32_t group_id = 0); int fill_cell(common::sqlclient::ObMySQLResult *result, share::ObLSStatusInfo &status_info); int fill_cell(common::sqlclient::ObMySQLResult *result, @@ -508,12 +510,14 @@ private: int inner_get_ls_status_(const ObSqlString &sql, const uint64_t exec_tenant_id, const bool need_member_list, ObISQLClient &client, ObMemberList &member_list, share::ObLSStatusInfo &status_info, - ObMember &arb_member, common::GlobalLearnerList &learner_list); + ObMember &arb_member, common::GlobalLearnerList &learner_list, + const int32_t group_id); int get_ls_status_(const uint64_t tenant_id, const ObLSID &id, const bool need_member_list, ObMemberList &member_list, ObLSStatusInfo &status_info, ObISQLClient &client, - ObMember &arb_member, common::GlobalLearnerList &learner_list); + ObMember &arb_member, common::GlobalLearnerList &learner_list, + const int32_t group_id); int construct_ls_primary_info_sql_(common::ObSqlString &sql); diff --git a/src/share/ob_inner_table_operator.cpp b/src/share/ob_inner_table_operator.cpp index 5e241dec0b..84784a6e73 100644 --- a/src/share/ob_inner_table_operator.cpp +++ b/src/share/ob_inner_table_operator.cpp @@ -93,7 +93,7 @@ int ObIInnerTableRow::build_assignments(ObSqlString &assignments) const * ------------------------------ObInnerTableOperator--------------------- */ ObInnerTableOperator::ObInnerTableOperator() - : is_inited_(false), table_name_(), exec_tenant_id_provider_(nullptr) + : is_inited_(false), table_name_(), exec_tenant_id_provider_(nullptr), group_id_(0) { } @@ -104,19 +104,21 @@ ObInnerTableOperator::~ObInnerTableOperator() } int ObInnerTableOperator::init( - const char *tname, const ObIExecTenantIdProvider &exec_tenant_id_provider) + const char *tname, const ObIExecTenantIdProvider &exec_tenant_id_provider, + const int32_t group_id) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObInnerTableOperator init twice", K(ret)); - } else if (OB_ISNULL(tname)) { + } else if (OB_ISNULL(tname) || group_id < 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("empty table name", K(ret), K(tname)); + LOG_WARN("empty table name", K(ret), K(tname), K(group_id)); } else if (OB_FAIL(table_name_.assign(tname))) { LOG_WARN("failed to assign table name", K(ret), K(tname)); } else { exec_tenant_id_provider_ = &exec_tenant_id_provider; + group_id_ = group_id; is_inited_ = true; } @@ -642,7 +644,7 @@ int ObInnerTableOperator::do_insert_row_(ObISQLClient &proxy, const ObIInnerTabl LOG_WARN("fail to fill dml", K(ret), K(this), K(row)); } else if (OB_FAIL(dml.splice_insert_sql(tname, sql))) { LOG_WARN("failed to splice insert sql", K(ret), K(this), K(row)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("insert one row", K(row), K(affected_rows), K(sql)); @@ -665,7 +667,7 @@ int ObInnerTableOperator::do_update_row_(ObISQLClient &proxy, const ObIInnerTabl LOG_WARN("fail to fill dml", K(ret), K(this), K(row)); } else if (OB_FAIL(dml.splice_update_sql(tname, sql))) { LOG_WARN("failed to splice update sql", K(ret), K(this), K(row)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("update one row", K(row), K(affected_rows), K(sql)); @@ -688,7 +690,7 @@ int ObInnerTableOperator::do_insert_or_update_row_(ObISQLClient &proxy, const Ob LOG_WARN("fail to fill dml", K(ret), K(this), K(row)); } else if (OB_FAIL(dml.splice_insert_update_sql(tname, sql))) { LOG_WARN("failed to splice insert update sql", K(ret), K(this), K(row)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("insert/update one row", K(row), K(affected_rows), K(sql)); @@ -711,7 +713,7 @@ int ObInnerTableOperator::do_delete_row_( LOG_WARN("fail to fill pkey dml", K(ret), K(this), K(key)); } else if (OB_FAIL(dml.splice_delete_sql(tname, sql))) { LOG_WARN("failed to splice delete sql", K(ret), K(this), K(key)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("delete one row", K(key), K(affected_rows), K(sql)); @@ -746,7 +748,7 @@ int ObInnerTableOperator::do_get_column_( } else { HEAP_VAR(ObMySQLProxy::ReadResult, res) { ObMySQLResult *result = NULL; - if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr()))) { + if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("failed to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -785,7 +787,7 @@ int ObInnerTableOperator::do_get_int_column_( } else { HEAP_VAR(ObMySQLProxy::ReadResult, res) { ObMySQLResult *result = NULL; - if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr()))) { + if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("failed to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -827,7 +829,7 @@ int ObInnerTableOperator::do_get_string_column_( } else { HEAP_VAR(ObMySQLProxy::ReadResult, res) { ObMySQLResult *result = NULL; - if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr()))) { + if (OB_FAIL(proxy.read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("failed to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -862,7 +864,7 @@ int ObInnerTableOperator::do_increase_column_by_( LOG_WARN("fail to fill predicates", K(ret), K(key), K(column_name), K(value)); } else if (OB_FAIL(sql.append_fmt(" where %s", predicates.ptr()))) { LOG_WARN("failed to append sql", K(ret), K(sql), K(predicates)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("update one column", K(key), K(column_name), K(value), K(affected_rows), K(sql)); @@ -891,7 +893,7 @@ int ObInnerTableOperator::do_update_column_( LOG_WARN("fail to fill predicates", K(ret), K(key), K(assignments)); } else if (OB_FAIL(sql.append_fmt(" where %s", predicates.ptr()))) { LOG_WARN("failed to append sql", K(ret), K(sql), K(predicates)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("update one column", K(key), K(assignments), K(affected_rows), K(sql)); @@ -926,7 +928,7 @@ int ObInnerTableOperator::do_compare_and_swap_( LOG_WARN("failed to append sql", K(ret), K(sql), K(pkey_predicates)); } else if (OB_FAIL(sql.append_fmt(" and %s", predicates))) { LOG_WARN("failed to append sql", K(ret), K(sql), K(assignments), K(predicates)); - } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(proxy.write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to exec sql", K(ret), K(sql), K(exec_tenant_id)); } else { LOG_INFO("compare and swap one column", K(key), K(assignments), K(predicates), K(affected_rows), K(sql)); @@ -1001,4 +1003,4 @@ int ObInnerTableOperator::parse_one_column_( } return ret; -} \ No newline at end of file +} diff --git a/src/share/ob_inner_table_operator.h b/src/share/ob_inner_table_operator.h index 51f8e4b73f..d8316cbc69 100644 --- a/src/share/ob_inner_table_operator.h +++ b/src/share/ob_inner_table_operator.h @@ -107,7 +107,7 @@ public: // row operation // Init operator with operation table name. int init( - const char *tname, const ObIExecTenantIdProvider &exec_tenant_id_provider); + const char *tname, const ObIExecTenantIdProvider &exec_tenant_id_provider, const int32_t group_id = 0); // Get operation table name. const char *get_table_name() const; const ObIExecTenantIdProvider *get_exec_tenant_id_provider() const; @@ -270,6 +270,7 @@ private: bool is_inited_; TableName table_name_; // operation table name. const ObIExecTenantIdProvider *exec_tenant_id_provider_; // provide tenant id to exec sql. + int32_t group_id_; //remote inner sql rpc queue }; @@ -278,4 +279,4 @@ private: } } -#endif \ No newline at end of file +#endif diff --git a/src/share/ob_tablet_replica_checksum_operator.cpp b/src/share/ob_tablet_replica_checksum_operator.cpp index 521f2bed0c..a6193b67a4 100644 --- a/src/share/ob_tablet_replica_checksum_operator.cpp +++ b/src/share/ob_tablet_replica_checksum_operator.cpp @@ -497,6 +497,7 @@ int ObTabletReplicaChecksumOperator::batch_get( items.reset(); tablet_items_cnt = 0; const int64_t pairs_cnt = pairs.count(); + const int32_t group_id = share::OBCG_DEFAULT; if (OB_UNLIKELY(pairs_cnt < 1 || OB_INVALID_TENANT_ID == tenant_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(pairs_cnt)); @@ -511,7 +512,7 @@ int ObTabletReplicaChecksumOperator::batch_get( false/*include_larger_than*/, false/*with_compaction_scn*/))) { LOG_WARN("fail to construct batch get sql", KR(ret), K(tenant_id), K(pairs), K(start_idx), K(end_idx)); - } else if (OB_FAIL(inner_batch_get_by_sql_(tenant_id, sql, sql_proxy, items, tmp_tablet_items_cnt))) { + } else if (OB_FAIL(inner_batch_get_by_sql_(tenant_id, sql, group_id, sql_proxy, items, tmp_tablet_items_cnt))) { LOG_WARN("fail to inner batch get by sql", KR(ret), K(tenant_id), K(sql)); } else { start_idx = end_idx; @@ -541,15 +542,16 @@ int ObTabletReplicaChecksumOperator::batch_get( ObISQLClient &sql_proxy, ObIArray &items, int64_t &tablet_items_cnt, - const bool include_larger_than) + const bool include_larger_than, + const int32_t group_id) { int ret = OB_SUCCESS; items.reset(); tablet_items_cnt = 0; const int64_t pairs_cnt = pairs.count(); - if (OB_UNLIKELY(pairs_cnt < 1 || OB_INVALID_TENANT_ID == tenant_id)) { + if (OB_UNLIKELY(pairs_cnt < 1 || OB_INVALID_TENANT_ID == tenant_id || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(pairs_cnt)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(pairs_cnt), K(group_id)); } // Step 2: cut tablet replica checksum items into small batches int64_t start_idx = 0; @@ -562,7 +564,7 @@ int ObTabletReplicaChecksumOperator::batch_get( sql, include_larger_than, true/*with_compaction_scn*/))) { LOG_WARN("fail to construct batch get sql", KR(ret), K(tenant_id), K(compaction_scn), K(pairs), K(start_idx), K(end_idx)); - } else if (OB_FAIL(inner_batch_get_by_sql_(tenant_id, sql, sql_proxy, items, tmp_tablet_items_cnt))) { + } else if (OB_FAIL(inner_batch_get_by_sql_(tenant_id, sql, group_id, sql_proxy, items, tmp_tablet_items_cnt))) { LOG_WARN("fail to inner batch get by sql", KR(ret), K(tenant_id), K(sql)); } else { start_idx = end_idx; @@ -585,18 +587,19 @@ int ObTabletReplicaChecksumOperator::batch_get( int ObTabletReplicaChecksumOperator::inner_batch_get_by_sql_( const uint64_t tenant_id, const ObSqlString &sql, + const int32_t group_id, ObISQLClient &sql_proxy, ObIArray &items, int64_t &tablet_items_cnt) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || group_id < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(group_id)); } else { const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id); SMART_VAR(ObISQLClient::ReadResult, result) { - if (OB_FAIL(sql_proxy.read(result, meta_tenant_id, sql.ptr()))) { + if (OB_FAIL(sql_proxy.read(result, meta_tenant_id, sql.ptr(), group_id))) { LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), "sql", sql.ptr()); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/ob_tablet_replica_checksum_operator.h b/src/share/ob_tablet_replica_checksum_operator.h index 35d835739c..dd2f2eecf9 100644 --- a/src/share/ob_tablet_replica_checksum_operator.h +++ b/src/share/ob_tablet_replica_checksum_operator.h @@ -115,7 +115,8 @@ public: common::ObISQLClient &sql_proxy, common::ObIArray &items, int64_t &tablet_items_cnt, - const bool include_larger_than = false); + const bool include_larger_than = false, + const int32_t group_id = 0); static int batch_get( const uint64_t tenant_id, const common::ObIArray &pairs, @@ -185,6 +186,7 @@ private: static int inner_batch_get_by_sql_( const uint64_t tenant_id, const common::ObSqlString &sql, + const int32_t group_id, common::ObISQLClient &sql_client, common::ObIArray &items, int64_t &tablet_items_cnt); diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index 3ab0ae4ee4..6a0bc72c96 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -32,8 +32,8 @@ CGID_DEF(OBCG_MYSQL_LOGIN, 12) CGID_DEF(OBCG_CDCSERVICE, 13) CGID_DEF(OBCG_DIAG_TENANT, 14) CGID_DEF(OBCG_WR, 15) -CGID_DEF(OBCG_STORAGE_HA_LEVEL1, 16) -CGID_DEF(OBCG_STORAGE_HA_LEVEL2, 17) +CGID_DEF(OBCG_TRANSFER, 16) +CGID_DEF(OBCG_STORAGE_STREAM, 17) CGID_DEF(OBCG_DBA_COMMAND, 18, DEFAULT, 0) CGID_DEF(OBCG_STORAGE, 19) CGID_DEF(OBCG_LOCK, 20) diff --git a/src/share/restore/ob_physical_restore_table_operator.cpp b/src/share/restore/ob_physical_restore_table_operator.cpp index 7f1029e2d1..1fced508a5 100644 --- a/src/share/restore/ob_physical_restore_table_operator.cpp +++ b/src/share/restore/ob_physical_restore_table_operator.cpp @@ -83,23 +83,25 @@ PhysicalRestoreStatus ObPhysicalRestoreTableOperator::get_restore_status( ObPhysicalRestoreTableOperator::ObPhysicalRestoreTableOperator() : inited_(false), - sql_client_(NULL), tenant_id_(OB_INVALID_TENANT_ID) + sql_client_(NULL), tenant_id_(OB_INVALID_TENANT_ID), group_id_(0) { } int ObPhysicalRestoreTableOperator::init(common::ObISQLClient *sql_client, - const uint64_t tenant_id) + const uint64_t tenant_id, + const int32_t group_id) { int ret = OB_SUCCESS; if (inited_) { ret = OB_INIT_TWICE; LOG_WARN("physical restore table operator init twice", K(ret)); - } else if (OB_ISNULL(sql_client) || is_meta_tenant(tenant_id)) { + } else if (OB_ISNULL(sql_client) || is_meta_tenant(tenant_id) || group_id < 0) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("sql client is null or tenant id is invalid", KR(ret), KP(sql_client), K(tenant_id)); + LOG_WARN("sql client is null or tenant id is invalid", KR(ret), KP(sql_client), K(tenant_id), K(group_id)); } else { sql_client_ = sql_client; tenant_id_ = tenant_id; + group_id_ = group_id; inited_ = true; } return ret; @@ -124,7 +126,7 @@ int ObPhysicalRestoreTableOperator::insert_job(const ObPhysicalRestoreJob &job_i LOG_WARN("fail to fill dml splicer", KR(ret), K(tenant_id_), K(job_info)); } else if (OB_FAIL(dml.splice_batch_insert_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) { LOG_WARN("splice_insert_sql failed", KR(ret)); - } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", KR(ret), K(exec_tenant_id), K(sql)); } else if (affected_rows <= 0) { ret = OB_ERR_UNEXPECTED; @@ -357,7 +359,7 @@ int ObPhysicalRestoreTableOperator::get_jobs( } else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s ORDER BY job_id, name", OB_ALL_RESTORE_JOB_TNAME))) { LOG_WARN("failed to assign sql", K(ret)); - } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -748,7 +750,7 @@ int ObPhysicalRestoreTableOperator::check_job_exist( } else if (OB_FAIL(sql.assign_fmt("SELECT count(*) as count FROM %s WHERE job_id = %ld", OB_ALL_RESTORE_JOB_TNAME, job_id))) { LOG_WARN("failed to assign sql", K(ret)); - } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -785,7 +787,7 @@ int ObPhysicalRestoreTableOperator::get_job( } else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE job_id = %ld", OB_ALL_RESTORE_JOB_TNAME, job_id))) { LOG_WARN("failed to assign sql", K(ret)); - } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -853,7 +855,7 @@ int ObPhysicalRestoreTableOperator::update_job_error_info( to_cstring(addr), to_cstring(trace_id), job_id))) { LOG_WARN("failed to set sql", K(ret), K(mod_str), K(return_ret), K(trace_id), K(addr)); - } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", K(sql), KR(ret), K(exec_tenant_id)); } else if (!is_single_row(affected_rows) && !is_zero_row(affected_rows)) { @@ -888,7 +890,7 @@ int ObPhysicalRestoreTableOperator::update_job_status( "AND name = 'status' AND value != 'RESTORE_FAIL'", OB_ALL_RESTORE_JOB_TNAME, status_str, job_id))) { LOG_WARN("fail to assign fmt", K(ret), K(sql)); - } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", K(sql), K(ret), K(exec_tenant_id)); } else if (!is_single_row(affected_rows) && !is_zero_row(affected_rows)) { @@ -918,7 +920,7 @@ int ObPhysicalRestoreTableOperator::remove_job( LOG_WARN("failed to add pk column", K(ret), K(job_id)); } else if (OB_FAIL(dml.splice_delete_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) { LOG_WARN("splice_delete_sql failed", K(ret)); - } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", K(sql), K(ret), K(exec_tenant_id)); } else { // no need to check affected_rows @@ -1017,7 +1019,7 @@ int ObPhysicalRestoreTableOperator::get_restore_job_by_sql_( } else { SMART_VAR(common::ObMySQLProxy::MySQLResult, res) { common::sqlclient::ObMySQLResult *result = NULL; - if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) { + if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", K(ret), K(exec_tenant_id), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -1082,7 +1084,7 @@ int ObPhysicalRestoreTableOperator::check_finish_restore_to_consistent_scn( "left join %s as b on a.ls_id = b.ls_id", OB_ALL_LS_STATUS_TNAME, OB_ALL_LS_META_TABLE_TNAME))) { LOG_WARN("failed to assign sql", K(ret)); - } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_client_->read(res, exec_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", KR(ret), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/restore/ob_physical_restore_table_operator.h b/src/share/restore/ob_physical_restore_table_operator.h index 2c8f8cc083..150f33d048 100644 --- a/src/share/restore/ob_physical_restore_table_operator.h +++ b/src/share/restore/ob_physical_restore_table_operator.h @@ -41,7 +41,7 @@ public: * @param[in] tenant_id of restore job, maybe sys or user tenant * @param[in] sql client */ - int init(common::ObISQLClient *sql_client, const uint64_t tenant_id); + int init(common::ObISQLClient *sql_client, const uint64_t tenant_id, const int32_t group_id); /* * description: insert into __all_restore_job * @param[in] restore job @@ -160,6 +160,7 @@ private: bool inited_; common::ObISQLClient *sql_client_; uint64_t tenant_id_; + int32_t group_id_; DISALLOW_COPY_AND_ASSIGN(ObPhysicalRestoreTableOperator); }; @@ -188,7 +189,7 @@ int ObPhysicalRestoreTableOperator::update_restore_option( SHARE_LOG(WARN, "fail to add column", KR(ret), K(option_value)); } else if (OB_FAIL(dml.splice_update_sql(OB_ALL_RESTORE_JOB_TNAME, sql))) { SHARE_LOG(WARN, "splice_insert_sql failed", KR(ret)); - } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client_->write(exec_tenant_id, sql.ptr(), group_id_, affected_rows))) { SHARE_LOG(WARN, "execute sql failed", K(sql), KR(ret), K(exec_tenant_id)); } else if (affected_rows <= 0) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/restore/ob_restore_persist_helper.cpp b/src/share/restore/ob_restore_persist_helper.cpp index 8c177f9ed7..1d23722401 100644 --- a/src/share/restore/ob_restore_persist_helper.cpp +++ b/src/share/restore/ob_restore_persist_helper.cpp @@ -678,7 +678,7 @@ int ObHisRestoreJobPersistInfo::init_initiator_job_history( * ------------------------------ObRestorePersistHelper--------------------- */ ObRestorePersistHelper::ObRestorePersistHelper() - : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID) + : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), group_id_(0) { } @@ -688,7 +688,7 @@ uint64_t ObRestorePersistHelper::get_exec_tenant_id() const return gen_meta_tenant_id(tenant_id_); } -int ObRestorePersistHelper::init(const uint64_t tenant_id) +int ObRestorePersistHelper::init(const uint64_t tenant_id, const int32_t group_id) { int ret = OB_SUCCESS; if(!is_sys_tenant(tenant_id) && !is_user_tenant(tenant_id)) { @@ -696,6 +696,7 @@ int ObRestorePersistHelper::init(const uint64_t tenant_id) LOG_WARN("invalid tenant id", K(ret), K(tenant_id)); } else { tenant_id_ = tenant_id; + group_id_ = group_id; is_inited_ = true; } return ret; @@ -711,7 +712,7 @@ int ObRestorePersistHelper::insert_initial_restore_progress( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); } else if (OB_FAIL(restore_progress_table_operator.insert_or_update_row(proxy, persist_info, affected_rows))) { LOG_WARN("failed to insert initial restore progress", K(ret)); @@ -730,7 +731,7 @@ int ObRestorePersistHelper::get_restore_process( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); } else if (OB_FAIL(table_op.get_row(proxy, false, key, persist_info))) { LOG_WARN("failed to get persist info", KR(ret), K(key)); @@ -748,7 +749,7 @@ int ObRestorePersistHelper::insert_restore_job_history( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this))) { + } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); } else if (OB_FAIL(table_op.insert_row(proxy, persist_info, affected_rows))) { LOG_WARN("failed to get persist info", KR(ret), K(persist_info)); @@ -768,7 +769,7 @@ int ObRestorePersistHelper::get_restore_job_history( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this))) { + } else if (OB_FAIL(table_op.init(OB_ALL_RESTORE_JOB_HISTORY_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); } else if (OB_FAIL(table_op.get_row(proxy, false, restore_key, persist_info))) { LOG_WARN("failed to get persist info", KR(ret), K(restore_key)); @@ -786,7 +787,7 @@ int ObRestorePersistHelper::insert_initial_ls_restore_progress( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); } else if (OB_FAIL(ls_restore_progress_table_operator.insert_or_update_row(proxy, persist_info, affected_rows))) { LOG_WARN("failed to insert initial ls restore progress", K(ret)); @@ -805,7 +806,7 @@ int ObRestorePersistHelper::record_ls_his_restore_progress( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this))) { + } else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls his restore progress table", K(ret)); } else if (OB_FAIL(ls_his_restore_table_operator.insert_row(proxy, persist_info, affected_rows))) { LOG_WARN("failed to insert ls his restore progress", K(ret)); @@ -824,7 +825,7 @@ int ObRestorePersistHelper::inc_need_restore_ls_count_by_one( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); } else if (OB_FAIL(restore_progress_table_operator.increase_column_by_one(trans, job_key, OB_STR_LS_COUNT, affected_rows))) { LOG_WARN("failed to increase finished ls count in restore progress table", K(ret), K(job_key)); @@ -854,11 +855,11 @@ int ObRestorePersistHelper::inc_finished_ls_count_by_one( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); - } else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this))) { + } else if (OB_FAIL(ls_his_restore_table_operator.init(OB_ALL_LS_RESTORE_HISTORY_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore history table", K(ret)); } else if (OB_FAIL(ls_restore_progress_table_operator.get_row(trans, true/* need lock */, ls_key, progress_info))) { LOG_WARN("failed to get ls restore progress", K(ret), K(ls_key)); @@ -888,9 +889,9 @@ int ObRestorePersistHelper::inc_finished_tablet_count_by_one( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); } else if (OB_FAIL(restore_progress_table_operator.increase_column_by_one(trans, job_key, OB_STR_FINISH_TABLET_COUNT, affected_rows))) { LOG_WARN("failed to increase finished tablet count in restore progress table", K(ret), K(job_key)); @@ -915,9 +916,9 @@ int ObRestorePersistHelper::inc_finished_restored_block_bytes( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(restore_progress_table_operator.init(OB_ALL_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init restore progress table", K(ret)); - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); } else if (OB_FAIL(restore_progress_table_operator.increase_column_by(trans, job_key, OB_STR_FINISH_BYTES, inc_finished_bytes, affected_rows))) { LOG_WARN("failed to update finished bytes in restore progress table", K(ret), K(job_key), K(inc_finished_bytes)); @@ -939,7 +940,7 @@ int ObRestorePersistHelper::update_log_restore_progress( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObRestorePersistHelper not init", K(ret)); - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); } else if (OB_FAIL(ls_restore_progress_table_operator.update_uint_column(proxy, ls_key, OB_STR_LAST_REPLAY_SCN, last_replay_scn.get_val_for_inner_table_field(), affected_rows))) { LOG_WARN("failed to update last replay scn in ls restore progress table", K(ret), K(ls_key), K(last_replay_scn)); @@ -965,7 +966,7 @@ int ObRestorePersistHelper::update_ls_restore_status( ret = OB_INVALID_ARGUMENT; LOG_WARN("comment must not be null", K(ret), KP(comment)); } else if (OB_FALSE_IT(trace_id.to_string(trace_id_str, OB_MAX_TRACE_ID_BUFFER_SIZE))) { - } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this))) { + } else if (OB_FAIL(ls_restore_progress_table_operator.init(OB_ALL_LS_RESTORE_PROGRESS_TNAME, *this, group_id_))) { LOG_WARN("failed to init ls restore progress table", K(ret)); } else if (OB_FAIL(ls_restore_progress_table_operator.update_int_column(proxy, ls_key, OB_STR_STATUS, status, affected_rows))) { diff --git a/src/share/restore/ob_restore_persist_helper.h b/src/share/restore/ob_restore_persist_helper.h index 67743f1670..5d41e0aa46 100644 --- a/src/share/restore/ob_restore_persist_helper.h +++ b/src/share/restore/ob_restore_persist_helper.h @@ -407,7 +407,7 @@ public: uint64_t get_exec_tenant_id() const override; - int init(const uint64_t tenant_id); + int init(const uint64_t tenant_id, const int32_t group_id); int insert_initial_restore_progress( common::ObISQLClient &proxy, const ObRestoreProgressPersistInfo &persist_info) const; @@ -474,6 +474,7 @@ private: bool is_inited_; uint64_t tenant_id_; // sys or user tenant id + int32_t group_id_; }; diff --git a/src/share/tablet/ob_tablet_table_operator.cpp b/src/share/tablet/ob_tablet_table_operator.cpp index e1c1430d65..c1d5e70bc6 100644 --- a/src/share/tablet/ob_tablet_table_operator.cpp +++ b/src/share/tablet/ob_tablet_table_operator.cpp @@ -29,7 +29,7 @@ using namespace common; namespace share { ObTabletTableOperator::ObTabletTableOperator() - : inited_(false), sql_proxy_(NULL), batch_size_(MAX_BATCH_COUNT) + : inited_(false), sql_proxy_(NULL), batch_size_(MAX_BATCH_COUNT), group_id_(0) { } @@ -46,8 +46,27 @@ int ObTabletTableOperator::init(ObISQLClient &sql_proxy) LOG_WARN("init twice", KR(ret)); } else { sql_proxy_ = &sql_proxy; - inited_ = true; batch_size_ = MAX_BATCH_COUNT; + group_id_ = 0; /*OBCG_DEFAULT*/ + inited_ = true; + } + return ret; +} + +int ObTabletTableOperator::init(const int32_t group_id, common::ObISQLClient &sql_proxy) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("init twice", KR(ret)); + } else if (group_id < 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("init tablet table operator get invalid argument", K(ret), K(group_id)); + } else { + sql_proxy_ = &sql_proxy; + batch_size_ = MAX_BATCH_COUNT; + group_id_ = group_id; + inited_ = true; } return ret; } @@ -124,6 +143,7 @@ int ObTabletTableOperator::batch_get_tablet_info( common::ObISQLClient *sql_proxy, const uint64_t tenant_id, const ObIArray &tablet_ls_infos, + const int32_t group_id, ObIArray &tablet_infos) { int ret = OB_SUCCESS; @@ -141,6 +161,7 @@ int ObTabletTableOperator::batch_get_tablet_info( tablet_ls_infos, start_idx, end_idx, + group_id, tablet_infos))) { LOG_WARN("fail to inner batch get by sql", KR(ret), K(tenant_id), K(tablet_ls_infos), K(start_idx), K(end_idx)); @@ -162,6 +183,7 @@ int ObTabletTableOperator::inner_batch_get_tablet_by_sql_( const ObIArray &tablet_ls_infos, const int64_t start_idx, const int64_t end_idx, + const int32_t group_id, ObIArray &tablet_infos) { int ret = OB_SUCCESS; @@ -204,7 +226,7 @@ int ObTabletTableOperator::inner_batch_get_tablet_by_sql_( } if (FAILEDx(sql.append_fmt(") ORDER BY FIELD(tablet_id%s)", part_sql.string().ptr()))) { LOG_WARN("assign sql string failed", KR(ret)); - } else if (OB_FAIL(sql_client.read(result, sql_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_client.read(result, sql_tenant_id, sql.ptr(), group_id))) { LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql_tenant_id), "sql", sql.ptr()); } else if (OB_ISNULL(result.get_result())) { @@ -590,7 +612,7 @@ int ObTabletTableOperator::inner_batch_update_by_sql_( } if (FAILEDx(dml.splice_batch_insert_update_sql(OB_ALL_TABLET_META_TABLE_TNAME, sql))) { LOG_WARN("fail to splice batch insert update sql", KR(ret), K(sql)); - } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(sql_tenant_id), K(sql)); } } @@ -650,7 +672,7 @@ int ObTabletTableOperator::range_get( start_tablet_id.id(), range_size))) { LOG_WARN("fail to assign sql", KR(ret), K(sql)); - } else if (OB_FAIL(sql_proxy_->read(result, sql_tenant_id, sql.ptr()))) { + } else if (OB_FAIL(sql_proxy_->read(result, sql_tenant_id, sql.ptr(), group_id_))) { LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql_tenant_id), K(sql)); } else if (OB_ISNULL(result.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -756,7 +778,7 @@ int ObTabletTableOperator::inner_batch_remove_by_sql_( } if (FAILEDx(dml.splice_batch_delete_sql(OB_ALL_TABLET_META_TABLE_TNAME, sql))) { LOG_WARN("fail to splice batch delete sql", KR(ret), K(sql)); - } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", KR(ret), K(tenant_id), K(sql_tenant_id), K(sql)); } } @@ -818,7 +840,7 @@ int ObTabletTableOperator::remove_residual_tablet( server.get_port(), limit))) { LOG_WARN("assign sql string failed", KR(ret), K(sql)); - } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), affected_rows))) { + } else if (OB_FAIL(sql_client.write(sql_tenant_id, sql.ptr(), group_id_, affected_rows))) { LOG_WARN("execute sql failed", KR(ret), K(sql), K(sql_tenant_id)); } else if (affected_rows > 0) { LOG_INFO("finish to remove residual tablet", KR(ret), K(tenant_id), K(affected_rows)); diff --git a/src/share/tablet/ob_tablet_table_operator.h b/src/share/tablet/ob_tablet_table_operator.h index f6f583ae30..47fa64e7a0 100644 --- a/src/share/tablet/ob_tablet_table_operator.h +++ b/src/share/tablet/ob_tablet_table_operator.h @@ -49,6 +49,7 @@ public: ObTabletTableOperator(); virtual ~ObTabletTableOperator(); int init(common::ObISQLClient &sql_proxy_); + int init(const int32_t group_id, common::ObISQLClient &sql_proxy); void reset(); void set_batch_size(int64_t batch_size) {batch_size_ = batch_size;} int get( @@ -144,6 +145,7 @@ public: common::ObISQLClient *sql_proxy, const uint64_t tenant_id, const ObIArray &tablet_ls_infos, + const int32_t group_id, ObIArray &tablet_infos); private: static int inner_batch_get_tablet_by_sql_( @@ -152,6 +154,7 @@ private: const ObIArray &tablet_ls_infos, const int64_t start_idx, const int64_t end_idx, + const int32_t group_id, ObIArray &tablet_infos); static int inner_batch_get_by_sql_( ObISQLClient &sql_client, @@ -185,6 +188,7 @@ private: bool inited_; common::ObISQLClient *sql_proxy_; int64_t batch_size_; + int32_t group_id_; }; } // end namespace share } // end namespace oceanbase diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index b4b46b3508..c74f30939c 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -958,7 +958,7 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_meta_table( LOG_WARN("failed to reserve array", KR(ret), "array_cnt", tablet_ls_infos.count()); } else if (OB_FAIL(init_tablet_filters(filters))) { LOG_WARN("failed to init tablet filters", K(ret)); - } else if (OB_FAIL(ObTabletTableOperator::batch_get_tablet_info(GCTX.sql_proxy_, MTL_ID(), tablet_ls_infos, tablet_infos))) { + } else if (OB_FAIL(ObTabletTableOperator::batch_get_tablet_info(GCTX.sql_proxy_, MTL_ID(), tablet_ls_infos, share::OBCG_STORAGE /*group_list*/, tablet_infos))) { LOG_WARN("failed to get tablet info", K(ret), K(tablet_ls_infos)); } else { time_guard.click(ObCompactionScheduleTimeGuard::SEARCH_META_TABLE); diff --git a/src/storage/high_availability/ob_finish_transfer.cpp b/src/storage/high_availability/ob_finish_transfer.cpp index 35e4767131..1d3e7eb3c9 100644 --- a/src/storage/high_availability/ob_finish_transfer.cpp +++ b/src/storage/high_availability/ob_finish_transfer.cpp @@ -373,7 +373,7 @@ int ObTxFinishTransfer::get_transfer_tablet_info_from_inner_table_( if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid args", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(*sql_proxy_, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL2))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(*sql_proxy_, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(tablet_list.assign(transfer_task.get_tablet_list()))) { LOG_WARN("failed to assign tablet_list", KR(ret), K(transfer_task)); @@ -523,7 +523,7 @@ int ObTxFinishTransfer::inner_check_ls_logical_table_replaced_(const uint64_t te const common::ObAddr &server = *location; const int64_t timeout = GCONF.rpc_timeout; const int64_t cluster_id = GCONF.cluster_id; - const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + const uint64_t group_id = share::OBCG_STORAGE; ObCheckTransferTabletBackfillArg arg; arg.tenant_id_ = tenant_id; arg.ls_id_ = dest_ls_id; @@ -723,7 +723,7 @@ int ObTxFinishTransfer::check_all_ls_replica_replay_scn_(const ObTransferTaskID int tmp_ret = OB_SUCCESS; int64_t cur_quorum = 0; common::ObArray member_addr_list; - const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + const int32_t group_id = share::OBCG_STORAGE; if (OB_FAIL(ObTransferUtils::get_need_check_member(total_addr_list, finished_addr_list, member_addr_list))) { LOG_WARN("failed to get need check member", K(ret), K(task_id), K(tenant_id), K(ls_id)); } else if (OB_FAIL(ObTransferUtils::check_ls_replay_scn( @@ -797,7 +797,7 @@ int ObTxFinishTransfer::unlock_ls_member_list_(const uint64_t tenant_id, const s { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::unlock_ls_member_list( - tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { + tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(tenant_id), K(ls_id), K_(task_id), K(status)); } return ret; @@ -808,7 +808,7 @@ int ObTxFinishTransfer::lock_ls_member_list_(const uint64_t tenant_id, const sha { int ret = OB_SUCCESS; if (OB_FAIL(ObMemberListLockUtils::lock_ls_member_list( - tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { + tenant_id, ls_id, task_id_.id(), member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to unlock ls member list", K(ret), K(ls_id), K(member_list)); } else { #ifdef ERRSIM @@ -897,16 +897,16 @@ int ObTxFinishTransfer::update_transfer_task_result_(const ObTransferTaskID &tas const bool for_update = true; ObTransferStatus next_status; next_status = OB_SUCCESS == result ? ObTransferStatus::COMPLETED : ObTransferStatus::DOING; - if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL2))) { + if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(task_id), K(tenant_id)); } else if (transfer_task.get_start_scn() >= finish_scn) { ret = OB_ERR_UNEXPECTED; LOG_WARN("finish scn not expected", K(ret), K(transfer_task), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::update_finish_scn( - trans, tenant_id, task_id, transfer_task.get_status(), finish_scn, share::OBCG_STORAGE_HA_LEVEL2))) { + trans, tenant_id, task_id, transfer_task.get_status(), finish_scn, share::OBCG_STORAGE))) { LOG_WARN("failed to update finish scn", K(ret), K(tenant_id), K(task_id), K(finish_scn)); } else if (OB_FAIL(ObTransferTaskOperator::finish_task( - trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, ObTransferTaskComment::EMPTY_COMMENT, share::OBCG_STORAGE_HA_LEVEL2))) { + trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, ObTransferTaskComment::EMPTY_COMMENT, share::OBCG_STORAGE))) { LOG_WARN("failed to finish task", K(ret), K(tenant_id), K(task_id)); } #ifdef ERRSIM @@ -986,7 +986,7 @@ int ObTxFinishTransfer::start_trans_( LOG_WARN("fail to set trx timeout", K(ret), K(stmt_timeout)); } else if (OB_FAIL(timeout_ctx.set_timeout(stmt_timeout))) { LOG_WARN("set timeout context failed", K(ret)); - } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, false/*with_snapshot*/, share::OBCG_STORAGE_HA_LEVEL2))) { + } else if (OB_FAIL(trans.start(sql_proxy_, tenant_id, false/*with_snapshot*/, share::OBCG_STORAGE))) { LOG_WARN("failed to start trans", K(ret), K(tenant_id)); } else { LOG_INFO("start trans", K(tenant_id)); @@ -1027,7 +1027,7 @@ int ObTxFinishTransfer::select_transfer_task_for_update_(const ObTransferTaskID if (!task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arg", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE_HA_LEVEL2))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (!task.get_status().is_doing_status()) { ret = OB_STATE_NOT_MATCH; diff --git a/src/storage/high_availability/ob_ls_member_list_service.cpp b/src/storage/high_availability/ob_ls_member_list_service.cpp index adc63c508c..08711feec9 100644 --- a/src/storage/high_availability/ob_ls_member_list_service.cpp +++ b/src/storage/high_availability/ob_ls_member_list_service.cpp @@ -293,7 +293,7 @@ int ObLSMemberListService::get_config_version_and_transfer_scn_( arg.need_get_config_version_ = need_get_config_version; const int64_t cluster_id = GCONF.cluster_id; const int64_t timeout = GCONF.sys_bkgd_migration_change_member_list_timeout; - const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + const uint64_t group_id = share::OBCG_STORAGE; if (OB_FAIL(proxy.call(addr, timeout, cluster_id, diff --git a/src/storage/high_availability/ob_storage_ha_reader.cpp b/src/storage/high_availability/ob_storage_ha_reader.cpp index fa7d846273..51e9070ccc 100644 --- a/src/storage/high_availability/ob_storage_ha_reader.cpp +++ b/src/storage/high_availability/ob_storage_ha_reader.cpp @@ -232,9 +232,10 @@ int ObCopyMacroBlockObReader::init( if (arg.get_serialize_size() > OB_MALLOC_BIG_BLOCK_SIZE) { ret = OB_ERR_SYS; LOG_ERROR("rpc arg must not larger than packet size", K(ret), K(arg.get_serialize_size())); - } else if (OB_FAIL(param.svr_rpc_proxy_->to(param.src_info_.src_addr_).by(OB_DATA_TENANT_ID).dst_cluster_id(param.src_info_.cluster_id_) + } else if (OB_FAIL(param.svr_rpc_proxy_->to(param.src_info_.src_addr_).by(param.tenant_id_).dst_cluster_id(param.src_info_.cluster_id_) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) .timeout(rpc_timeout) + .group_id(share::OBCG_STORAGE_STREAM) .fetch_macro_block(arg, rpc_buffer_, handle_))) { LOG_WARN("failed to send fetch macro block rpc", K(param), K(ret)); } else { @@ -822,8 +823,9 @@ int ObCopyTabletInfoObReader::init( } else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) { LOG_WARN("fail to init tablet info rpc reader", K(ret)); } else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) { - } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID).timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_) + } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(rpc_arg.tenant_id_).timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) + .group_id(share::OBCG_STORAGE_STREAM) .fetch_tablet_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) { LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg)); } else { @@ -1089,9 +1091,10 @@ int ObCopySSTableInfoObReader::init( } else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) { LOG_WARN("fail to init tablet info rpc reader", K(ret)); } else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) { - } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID) + } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(rpc_arg.tenant_id_) .timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) + .group_id(share::OBCG_STORAGE_STREAM) .fetch_tablet_sstable_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) { LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg)); } else { @@ -1950,9 +1953,10 @@ int ObCopySSTableMacroObReader::init( } else if (OB_FAIL(rpc_reader_.init(bandwidth_throttle))) { LOG_WARN("fail to init tablet info rpc reader", K(ret)); } else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) { - } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID) + } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(rpc_arg.tenant_id_) .timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) + .group_id(share::OBCG_STORAGE_STREAM) .fetch_sstable_macro_info(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) { LOG_WARN("failed to send fetch tablet info rpc", K(ret), K(src_info), K(rpc_arg)); } else { @@ -2480,9 +2484,10 @@ int ObCopyLSViewInfoObReader::init( ret = OB_TENANT_HAS_BEEN_DROPPED; LOG_WARN("tenant has been stopped, stop send get ls view rpc", K(ret), KPC(ls)); } else if (FALSE_IT(rpc_timeout = ObStorageHAUtils::get_rpc_timeout())) { - } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(OB_DATA_TENANT_ID) + } else if (OB_FAIL(srv_rpc_proxy.to(src_info.src_addr_).by(rpc_arg.tenant_id_) .timeout(rpc_timeout).dst_cluster_id(src_info.cluster_id_) .ratelimit(true).bg_flow(obrpc::ObRpcProxy::BACKGROUND_FLOW) + .group_id(share::OBCG_STORAGE_STREAM) .fetch_ls_view(rpc_arg, rpc_reader_.get_rpc_buffer(), rpc_reader_.get_handle()))) { if (OB_TABLET_GC_LOCK_CONFLICT != ret) { LOG_WARN("failed to send fetch ls view info rpc", K(ret), K(src_info), K(rpc_arg)); diff --git a/src/storage/high_availability/ob_storage_ha_struct.cpp b/src/storage/high_availability/ob_storage_ha_struct.cpp index d348182d4a..af74a5cafd 100644 --- a/src/storage/high_availability/ob_storage_ha_struct.cpp +++ b/src/storage/high_availability/ob_storage_ha_struct.cpp @@ -543,7 +543,7 @@ int ObMigrationStatusHelper::check_ls_with_transfer_task_( ret = OB_ERR_UNEXPECTED; LOG_WARN("mysql proxy should not be NULL", K(ret), KP(sql_proxy)); } else if (OB_FAIL(ObTransferTaskOperator::get_by_src_ls( - *sql_proxy, tenant_id, src_ls_id, task, share::OBCG_STORAGE_HA_LEVEL2))) { + *sql_proxy, tenant_id, src_ls_id, task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(src_ls_id)); if (OB_ENTRY_NOT_EXIST == ret || OB_TABLE_NOT_EXIST == ret) { need_check_allow_gc = true; diff --git a/src/storage/high_availability/ob_storage_ha_utils.cpp b/src/storage/high_availability/ob_storage_ha_utils.cpp index e99b930f2d..9a3735cacb 100644 --- a/src/storage/high_availability/ob_storage_ha_utils.cpp +++ b/src/storage/high_availability/ob_storage_ha_utils.cpp @@ -173,7 +173,7 @@ int ObStorageHAUtils::fetch_src_tablet_meta_info_(const uint64_t tenant_id, cons int ret = OB_SUCCESS; ObTabletTableOperator op; ObTabletReplica tablet_replica; - if (OB_FAIL(op.init(sql_client))) { + if (OB_FAIL(op.init(share::OBCG_STORAGE, sql_client))) { LOG_WARN("failed to init operator", K(ret)); } else if (OB_FAIL(op.get(tenant_id, tablet_id, ls_id, src_addr, tablet_replica))) { LOG_WARN("failed to get tablet meta info", K(ret), K(tenant_id), K(tablet_id), K(ls_id), K(src_addr)); @@ -190,11 +190,13 @@ int ObStorageHAUtils::check_tablet_replica_checksum_(const uint64_t tenant_id, c ObArray items; ObArray pairs; ObTabletLSPair pair; + int64_t tablet_items_cnt = 0; if (OB_FAIL(pair.init(tablet_id, ls_id))) { LOG_WARN("failed to init pair", K(ret), K(tablet_id), K(ls_id)); } else if (OB_FAIL(pairs.push_back(pair))) { LOG_WARN("failed to push back", K(ret), K(pair)); - } else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id, pairs, compaction_scn, sql_client, items))) { + } else if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id, pairs, compaction_scn, + sql_client, items, tablet_items_cnt, false/*include_larger_than*/, share::OBCG_STORAGE/*group_id*/))) { LOG_WARN("failed to batch get replica checksum item", K(ret), K(tenant_id), K(pairs), K(compaction_scn)); } else { ObArray filter_items; diff --git a/src/storage/high_availability/ob_transfer_handler.cpp b/src/storage/high_availability/ob_transfer_handler.cpp index bf43a4e6f9..3c494ac967 100644 --- a/src/storage/high_availability/ob_transfer_handler.cpp +++ b/src/storage/high_availability/ob_transfer_handler.cpp @@ -176,7 +176,7 @@ int ObTransferHandler::get_transfer_task_from_inner_table_( if (! task_id.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arg", K(ret), K(task_id)); - } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_STORAGE_HA_LEVEL1))) { + } else if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, task, share::OBCG_TRANSFER))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(task_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -224,7 +224,7 @@ int ObTransferHandler::fetch_transfer_task_from_inner_table_by_src_ls_( const ObLSID &src_ls_id = ls_->get_ls_id(); ObTransferTask task; if (OB_FAIL(ObTransferTaskOperator::get_by_src_ls( - *sql_proxy_, tenant_id, src_ls_id, task, share::OBCG_STORAGE_HA_LEVEL2))) { + *sql_proxy_, tenant_id, src_ls_id, task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task", K(ret), K(tenant_id), K(src_ls_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -250,7 +250,7 @@ int ObTransferHandler::fetch_transfer_task_from_inner_table_by_dest_ls_( const ObLSID &dest_ls_id = ls_->get_ls_id(); ObTransferTask task; if (OB_FAIL(ObTransferTaskOperator::get_by_dest_ls( - *sql_proxy_, tenant_id, dest_ls_id, task, share::OBCG_STORAGE_HA_LEVEL2))) { + *sql_proxy_, tenant_id, dest_ls_id, task, share::OBCG_STORAGE))) { LOG_WARN("failed to get transfer task by dest ls", K(ret), K(tenant_id), K(dest_ls_id)); } else if (OB_FAIL(task_info.convert_from(tenant_id, task))) { LOG_WARN("failed to convert from transfer task", K(ret), K(task)); @@ -618,7 +618,7 @@ int ObTransferHandler::lock_src_and_dest_ls_member_list_( } else if (OB_FAIL(lock_ls_list.push_back(dest_ls_id))) { LOG_WARN("failed to push back", K(ret), K(dest_ls_id)); } else if (OB_FAIL(ObMemberListLockUtils::batch_lock_ls_member_list(tenant_id, task_id, - lock_ls_list, member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { + lock_ls_list, member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to batch lock ls member list", K(ret)); } else if (OB_FAIL(check_ls_member_list_same_(src_ls_id, dest_ls_id, member_list, is_same))) { LOG_WARN("failed to check ls member listsame", K(ret), K(src_ls_id), K(dest_ls_id)); @@ -700,7 +700,7 @@ int ObTransferHandler::inner_unlock_ls_member_list_( const int64_t task_id = task_info.task_id_.id(); const ObTransferLockStatus status(ObTransferLockStatus::START); if (OB_FAIL(ObMemberListLockUtils::unlock_ls_member_list( - tenant_id, ls_id, task_id, member_list, status, share::OBCG_STORAGE_HA_LEVEL2, *sql_proxy_))) { + tenant_id, ls_id, task_id, member_list, status, share::OBCG_STORAGE, *sql_proxy_))) { LOG_WARN("failed to lock ls member list", K(ret), K(task_info), K(ls_id), K(member_list)); } return ret; @@ -868,7 +868,7 @@ int ObTransferHandler::check_start_status_transfer_tablets_( arg.dest_ls_id_ = task_info.dest_ls_id_; const int64_t timeout = GCONF.rpc_timeout; const int64_t cluster_id = GCONF.cluster_id; - const uint64_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + const uint64_t group_id = share::OBCG_STORAGE; if (OB_FAIL(arg.tablet_list_.assign(task_info.tablet_list_))) { LOG_WARN("failed to assign tablet list", K(ret), K(task_info)); } else if (OB_FAIL(batch_proxy.call(addr, @@ -1192,7 +1192,7 @@ int ObTransferHandler::start_trans_( int64_t stmt_timeout = 10_s; const int64_t LOCK_MEMBER_LIST_TIMEOUT = 10_s; const bool with_snapshot = false; - const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL1; + const int32_t group_id = share::OBCG_TRANSFER; if (tenant_config.is_valid()) { stmt_timeout = tenant_config->_transfer_start_trans_timeout + LOCK_MEMBER_LIST_TIMEOUT; if (tenant_config->_enable_balance_kill_transaction) { @@ -1475,7 +1475,7 @@ int ObTransferHandler::wait_src_ls_replay_to_start_scn_( common::ObMemberList member_list; ObArray member_addr_list; const int64_t start_ts = ObTimeUtil::current_time(); - const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL1; + const int32_t group_id = share::OBCG_TRANSFER; if (!is_inited_) { ret = OB_NOT_INIT; @@ -1514,7 +1514,7 @@ int ObTransferHandler::precheck_ls_replay_scn_(const share::ObTransferTaskInfo & share::SCN check_scn; ObTimeoutCtx timeout_ctx; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); - const int32_t group_id = share::OBCG_STORAGE_HA_LEVEL2; + const int32_t group_id = share::OBCG_STORAGE; if (tenant_config.is_valid()) { const int64_t timeout = tenant_config->_transfer_start_trans_timeout * 0.5; if (OB_FAIL(timeout_ctx.set_timeout(timeout))) { @@ -1853,7 +1853,7 @@ int ObTransferHandler::update_all_tablet_to_ls_( const ObTransferTabletInfo &tablet_info = task_info.tablet_list_.at(i); if (OB_FAIL(ObTabletToLSTableOperator::update_ls_id_and_transfer_seq(trans, task_info.tenant_id_, tablet_info.tablet_id_, tablet_info.transfer_seq_, task_info.src_ls_id_, - tablet_info.transfer_seq_ + 1, task_info.dest_ls_id_, share::OBCG_STORAGE_HA_LEVEL1))) { + tablet_info.transfer_seq_ + 1, task_info.dest_ls_id_, share::OBCG_TRANSFER))) { LOG_WARN("failed to update ls id and transfer seq", K(ret), K(tablet_info), K(task_info)); } } @@ -1922,7 +1922,7 @@ int ObTransferHandler::update_transfer_status_( } else if (!task_info.is_valid() || !next_status.is_valid()) { LOG_WARN("update transfer status get invalid argument", K(ret), K(task_info), K(next_status)); } else { - if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_STORAGE_HA_LEVEL1))) { + if (OB_FAIL(ObTransferTaskOperator::get(trans, tenant_id, task_id, for_update, transfer_task, share::OBCG_TRANSFER))) { LOG_WARN("failed to get transfer task", K(ret), K(task_id), K(tenant_id)); } else if (task_info.status_ != transfer_task.get_status() || task_info.src_ls_id_ != transfer_task.get_src_ls() @@ -1931,10 +1931,10 @@ int ObTransferHandler::update_transfer_status_( LOG_WARN("task info in not equal to inner table transfer task, unexpected", K(ret), K(task_info), K(transfer_task)); } else if (start_scn.is_valid() && OB_FAIL(ObTransferTaskOperator::update_start_scn( - trans, tenant_id, task_id, transfer_task.get_status(), start_scn, share::OBCG_STORAGE_HA_LEVEL1))) { + trans, tenant_id, task_id, transfer_task.get_status(), start_scn, share::OBCG_TRANSFER))) { LOG_WARN("failed to update finish scn", K(ret), K(tenant_id), K(task_id), K(start_scn)); } else if (OB_FAIL(ObTransferTaskOperator::update_status_and_result( - trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, share::OBCG_STORAGE_HA_LEVEL1))) { + trans, tenant_id, task_id, transfer_task.get_status(), next_status, result, share::OBCG_TRANSFER))) { LOG_WARN("failed to finish task", K(ret), K(tenant_id), K(task_id)); } else { #ifdef ERRSIM diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index 0457633d6a..393f6984b6 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -3366,7 +3366,10 @@ int ObStorageRpc::post_ls_info_request( arg.ls_id_ = ls_id; if (OB_FAIL(ObStorageHAUtils::get_server_version(arg.version_))) { LOG_WARN("failed to get server version", K(ret), K(ls_id)); - } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_).fetch_ls_info(arg, ls_info))) { + } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) + .fetch_ls_info(arg, ls_info))) { LOG_WARN("failed to fetch ls info", K(ret), K(arg), K(src_info)); } else { FLOG_INFO("fetch ls info successfully", K(ls_info)); @@ -3395,7 +3398,10 @@ int ObStorageRpc::post_ls_meta_info_request( arg.ls_id_ = ls_id; if (OB_FAIL(ObStorageHAUtils::get_server_version(arg.version_))) { LOG_WARN("failed to get server version", K(ret), K(arg)); - } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_).fetch_ls_meta_info(arg, ls_info))) { + } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) + .fetch_ls_meta_info(arg, ls_info))) { LOG_WARN("failed to fetch ls info", K(ret), K(arg), K(src_info)); } else { FLOG_INFO("fetch ls meta info successfully", K(ls_info)); @@ -3424,7 +3430,7 @@ int ObStorageRpc::post_ls_member_list_request( arg.ls_id_ = ls_id; if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_) .by(tenant_id) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) + .group_id(share::OBCG_STORAGE) .fetch_ls_member_list(arg, member_info))) { LOG_WARN("failed to fetch ls info", K(ret), K(arg), K(src_info)); } else { @@ -3477,7 +3483,10 @@ int ObStorageRpc::notify_restore_tablets( arg.ls_id_ = ls_id; arg.restore_status_ = restore_status; arg.leader_proposal_id_ = proposal_id; - if (OB_FAIL(rpc_proxy_->to(follower_info.src_addr_).dst_cluster_id(follower_info.cluster_id_).notify_restore_tablets(arg, restore_resp))) { + if (OB_FAIL(rpc_proxy_->to(follower_info.src_addr_).dst_cluster_id(follower_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) + .notify_restore_tablets(arg, restore_resp))) { LOG_WARN("failed to notify follower restore tablets", K(ret), K(arg), K(follower_info), K(ls_id), K(tablet_id_array)); } else { FLOG_INFO("notify follower restore tablets successfully", K(arg), K(follower_info), K(ls_id), K(tablet_id_array)); @@ -3505,7 +3514,10 @@ int ObStorageRpc::inquire_restore( arg.tenant_id_ = tenant_id; arg.ls_id_ = ls_id; arg.restore_status_ = restore_status; - if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_).inquire_restore(arg, restore_resp))) { + if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) + .inquire_restore(arg, restore_resp))) { LOG_WARN("failed to inquire restore", K(ret), K(arg), K(src_info)); } else { FLOG_INFO("inquire restore status successfully", K(arg), K(src_info)); @@ -3530,7 +3542,10 @@ int ObStorageRpc::update_ls_meta( ObRestoreUpdateLSMetaArg arg; arg.tenant_id_ = tenant_id; arg.ls_meta_package_ = ls_meta; - if (OB_FAIL(rpc_proxy_->to(dest_info.src_addr_).dst_cluster_id(dest_info.cluster_id_).update_ls_meta(arg))) { + if (OB_FAIL(rpc_proxy_->to(dest_info.src_addr_).dst_cluster_id(dest_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) + .update_ls_meta(arg))) { LOG_WARN("failed to update ls meta", K(ret), K(dest_info), K(ls_meta)); } else { FLOG_INFO("update ls meta succ", K(dest_info), K(ls_meta)); @@ -3562,7 +3577,7 @@ int ObStorageRpc::get_ls_active_trans_count( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) + .group_id(share::OBCG_STORAGE) .get_ls_active_trans_count(arg, res))) { LOG_WARN("failed to get ls active trans count", K(ret), K(src_info), K(arg)); } else { @@ -3604,7 +3619,7 @@ int ObStorageRpc::get_transfer_start_scn( .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) .timeout(get_transfer_start_scn_timeout) - .group_id(share::OBCG_STORAGE_HA_LEVEL1) + .group_id(share::OBCG_TRANSFER) .get_transfer_start_scn(arg, res))) { LOG_WARN("failed to get transfer start scn", K(ret), K(src_info), K(arg)); } else { @@ -3636,7 +3651,7 @@ int ObStorageRpc::submit_tx_log( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) + .group_id(share::OBCG_STORAGE) .submit_tx_log(arg, end_scn))) { LOG_WARN("failed to submit tx log", K(ret), K(src_info), K(arg)); } else { @@ -3667,7 +3682,7 @@ int ObStorageRpc::get_transfer_dest_prepare_scn( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) + .group_id(share::OBCG_STORAGE) .get_transfer_dest_prepare_scn(arg, ret_scn))) { LOG_WARN("failed to get transfer_dest_prepare_scn", K(ret), K(src_info), K(arg)); } else { @@ -3804,7 +3819,7 @@ int ObStorageRpc::wakeup_transfer_service( if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_) .by(tenant_id) .dst_cluster_id(src_info.cluster_id_) - .group_id(share::OBCG_STORAGE_HA_LEVEL2) + .group_id(share::OBCG_STORAGE) .wakeup_transfer_service(arg))) { LOG_WARN("failed to wakeup transfer service", K(ret), K(src_info), K(arg)); } @@ -3827,6 +3842,8 @@ int ObStorageRpc::fetch_ls_member_and_learner_list( ret = OB_NOT_INIT; STORAGE_LOG(WARN, "storage rpc is not inited", K(ret)); } else if (OB_FAIL(rpc_proxy_->to(src_info.src_addr_).dst_cluster_id(src_info.cluster_id_) + .by(tenant_id) + .group_id(share::OBCG_STORAGE) .fetch_ls_member_and_learner_list(arg, member_info))) { LOG_WARN("fail to check ls is valid member", K(ret), K(tenant_id), K(ls_id)); } diff --git a/src/storage/restore/ob_ls_restore_handler.cpp b/src/storage/restore/ob_ls_restore_handler.cpp index d3162b6056..48dfafb202 100644 --- a/src/storage/restore/ob_ls_restore_handler.cpp +++ b/src/storage/restore/ob_ls_restore_handler.cpp @@ -381,7 +381,7 @@ int ObLSRestoreHandler::check_restore_job_exist_(bool &is_exist) if (OB_ISNULL(sql_proxy_ = GCTX.sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql prxoy must not be null", K(ret)); - } else if (OB_FAIL(restore_table_operator.init(sql_proxy_, tenant_id))) { + } else if (OB_FAIL(restore_table_operator.init(sql_proxy_, tenant_id, share::OBCG_STORAGE))) { LOG_WARN("failed to init restore table operator", K(ret), K(tenant_id)); } else if (OB_FAIL(restore_table_operator.get_job_by_tenant_id(tenant_id, job_info))) { if (ret == OB_ENTRY_NOT_EXIST) { @@ -866,7 +866,7 @@ int ObILSRestoreState::report_ls_restore_progress_( ls_key.job_id_ = ls_restore_arg_->get_job_id(); ls_key.ls_id_ = ls.get_ls_id(); ls_key.addr_ = self_addr_; - if (OB_FAIL(helper.init(ls_key.tenant_id_))) { + if (OB_FAIL(helper.init(ls_key.tenant_id_, share::OBCG_STORAGE))) { LOG_WARN("fail to init restore table helper", K(ret), "tenant_id", ls_key.tenant_id_); } else if (OB_FAIL(helper.update_ls_restore_status(*proxy_, ls_key, trace_id, status, result, comment))) { if (OB_ENTRY_NOT_EXIST == ret) { @@ -901,7 +901,7 @@ int ObILSRestoreState::insert_initial_ls_restore_progress_() ls_restore_info.key_.addr_ = self_addr_; ls_restore_info.restore_scn_ = ls_restore_arg_->get_restore_scn(); ls_restore_info.status_ = ObLSRestoreStatus::Status::RESTORE_START; - if (OB_FAIL(helper.init(ls_restore_info.key_.tenant_id_))) { + if (OB_FAIL(helper.init(ls_restore_info.key_.tenant_id_, share::OBCG_STORAGE))) { LOG_WARN("fail to init restore table helper", K(ret), "tenant_id", ls_restore_info.key_.tenant_id_); } else if (OB_FAIL(helper.insert_initial_ls_restore_progress(*proxy_, ls_restore_info))) { LOG_WARN("fail to insert initial ls restore progress info", K(ret), K(ls_restore_info)); @@ -1549,7 +1549,7 @@ int ObLSRestoreStartState::check_ls_leader_ready_(bool &is_ready) if (OB_FAIL(sql.assign_fmt("select count(*) ls_count from %s where ls_id=%ld and role = 1", OB_ALL_LS_META_TABLE_TNAME, ls_->get_ls_id().id()))) { LOG_WARN("fail to assign sql", K(ret)); - } else if (OB_FAIL(proxy_->read(res, gen_meta_tenant_id(tenant_id), sql.ptr()))) { + } else if (OB_FAIL(proxy_->read(res, gen_meta_tenant_id(tenant_id), sql.ptr(), share::OBCG_STORAGE))) { LOG_WARN("execute sql failed", K(ret), K(sql)); } else if (OB_ISNULL(result = res.get_result())) { ret = OB_ERR_UNEXPECTED; @@ -1631,7 +1631,7 @@ int ObLSRestoreStartState::inc_need_restore_ls_cnt_() key.tenant_id_ = ls_restore_arg_->tenant_id_; key.ls_id_ = ls_->get_ls_id(); key.addr_ = self_addr_; - if (OB_FAIL(helper.init(key.tenant_id_))) { + if (OB_FAIL(helper.init(key.tenant_id_, share::OBCG_STORAGE))) { LOG_WARN("fail to init helper", K(ret), K(key.tenant_id_)); } else if (OB_FAIL(trans.start(proxy_, gen_meta_tenant_id(key.tenant_id_)))) { LOG_WARN("fail to start trans", K(ret), K(key.tenant_id_)); @@ -1663,7 +1663,7 @@ int ObLSRestoreHandler::fill_restore_arg() if (OB_ISNULL(sql_proxy_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql can't null", K(ret), K(sql_proxy_)); - } else if (OB_FAIL(restore_table_operator.init(sql_proxy_, tenant_id))) { + } else if (OB_FAIL(restore_table_operator.init(sql_proxy_, tenant_id, share::OBCG_STORAGE))) { LOG_WARN("fail to init restore table operator", K(ret)); } else { HEAP_VAR(ObPhysicalRestoreJob, job_info) { @@ -1706,7 +1706,7 @@ int ObLSRestoreStartState::check_ls_created_(bool &is_created) if (OB_ISNULL(sql_proxy)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql proxy is nullptr is unexpected", K(ret)); - } else if (OB_FAIL(ls_status_operator.get_ls_status_info(user_tenant_id, ls_->get_ls_id(), status_info, *sql_proxy))) { + } else if (OB_FAIL(ls_status_operator.get_ls_status_info(user_tenant_id, ls_->get_ls_id(), status_info, *sql_proxy, share::OBCG_STORAGE))) { LOG_WARN("fail to get ls status info", K(ret), K(user_tenant_id), "ls_id", ls_->get_ls_id()); } else if (!status_info.ls_is_create_abort() && !status_info.ls_is_creating()) { is_created = true; @@ -2832,7 +2832,7 @@ int ObLSWaitRestoreConsistentScnState::check_can_advance_status_(bool &can) cons int ret = OB_SUCCESS; share::ObPhysicalRestoreTableOperator restore_table_operator; const uint64_t tenant_id = ls_->get_tenant_id(); - if (OB_FAIL(restore_table_operator.init(proxy_, tenant_id))) { + if (OB_FAIL(restore_table_operator.init(proxy_, tenant_id, share::OBCG_STORAGE))) { LOG_WARN("fail to init restore table operator", K(ret), K(tenant_id)); } else { ObLSRestoreStatus next_status(ObLSRestoreStatus::QUICK_RESTORE); diff --git a/src/storage/tx/ob_dup_table_util.cpp b/src/storage/tx/ob_dup_table_util.cpp index edefafcf1e..607b661729 100644 --- a/src/storage/tx/ob_dup_table_util.cpp +++ b/src/storage/tx/ob_dup_table_util.cpp @@ -89,7 +89,7 @@ int ObDupTabletScanTask::refresh_dup_tablet_schema_( share::ObLSStatusOperator ls_status_op; if (OB_FAIL(ls_status_op.get_duplicate_ls_status_info(MTL_ID(), *GCTX.sql_proxy_, - dup_ls_status_info))) { + dup_ls_status_info, share::OBCG_STORAGE))) { if (OB_ENTRY_NOT_EXIST == ret) { DUP_TABLE_LOG(DEBUG, "no duplicate ls", K(dup_ls_status_info)); ret = OB_SUCCESS;