[CP] fix: use independent group for dbms_scheduler job rpc
This commit is contained in:
parent
5d0a6fec46
commit
17f8fe968f
@ -150,7 +150,7 @@ int64_t ObDBMSSchedJobMaster::run_job(ObDBMSSchedJobInfo &job_info, ObDBMSSchedJ
|
||||
job_key->get_job_name(),
|
||||
execute_addr,
|
||||
self_addr_,
|
||||
job_info.is_olap_async_job_class() ? share::OBCG_OLAP_ASYNC_JOB : 0))) {
|
||||
job_info.is_olap_async_job_class() ? share::OBCG_OLAP_ASYNC_JOB : share::OBCG_DBMS_SCHED_JOB))) {
|
||||
LOG_WARN("failed to run dbms sched job", K(ret), K(job_info), KPC(job_key));
|
||||
if (is_server_down_error(ret)) {
|
||||
int tmp = OB_SUCCESS;
|
||||
|
@ -451,11 +451,16 @@ int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool fo
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline bool is_dbms_job_group(int64_t group_id)
|
||||
{
|
||||
return share::OBCG_DBMS_SCHED_JOB == group_id || share::OBCG_OLAP_ASYNC_JOB == group_id;
|
||||
}
|
||||
|
||||
void ObResourceGroup::check_worker_count()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_SUCC(workers_lock_.trylock())) {
|
||||
if ((is_resource_manager_group(group_id_) || is_job_group(group_id_))
|
||||
if ((is_resource_manager_group(group_id_) || is_dbms_job_group(group_id_))
|
||||
&& nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) {
|
||||
for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) {
|
||||
if (OB_SUCC(acquire_level_worker(level))) {
|
||||
@ -1386,7 +1391,7 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
|
||||
if (req_level < 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id));
|
||||
} else if ((is_resource_manager_group(group_id) || is_job_group(group_id)) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
|
||||
} else if ((is_resource_manager_group(group_id) || is_dbms_job_group(group_id)) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) {
|
||||
group->recv_level_rpc_cnt_.atomic_inc(req_level);
|
||||
if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) {
|
||||
LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id));
|
||||
|
@ -284,7 +284,6 @@ public:
|
||||
int64_t max_worker_cnt() const;
|
||||
ObTenant *get_tenant() { return tenant_; }
|
||||
share::ObCgroupCtrl *get_cgroup_ctrl() { return cgroup_ctrl_; }
|
||||
bool is_job_group(int64_t group_id) { return share::OBCG_OLAP_ASYNC_JOB == group_id; }
|
||||
|
||||
int init();
|
||||
void update_queue_size();
|
||||
@ -549,7 +548,6 @@ private:
|
||||
int construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantModuleInitCtx *&ctx);
|
||||
|
||||
int recv_group_request(rpc::ObRequest &req, int64_t group_id);
|
||||
bool is_job_group(int64_t group_id) { return share::OBCG_OLAP_ASYNC_JOB == group_id; }
|
||||
protected:
|
||||
|
||||
mutable common::TCRWLock meta_lock_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user