diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index f5d0228f09..b3962a3365 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -285,7 +285,7 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr, // compatible with version before 4.2 compatible_before_420 = true; scale_rate = static_cast(admited_worker_count) / static_cast(expected_worker_count); - } else if (0 <= admited_worker_count || minimal_worker_count == admited_worker_count) { + } else if (0 >= admited_worker_count || minimal_worker_count == admited_worker_count) { scale_rate = 0.0; } else if (OB_UNLIKELY(minimal_worker_count > admited_worker_count || minimal_worker_count >= expected_worker_count)) { @@ -312,7 +312,8 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr, } LOG_TRACE("assign worker count to dfo", "dfo_id", child->get_dfo_id(), K(admited_worker_count), - K(expected_worker_count), "dop", child->get_dop(), K(scale_rate), K(val)); + K(expected_worker_count), K(minimal_worker_count), + "dop", child->get_dop(), K(scale_rate), K(val)); } // 因为上面取了 max,所以可能实际 assigned 的会超出 admission 数,这时应该报错 @@ -323,6 +324,7 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr, } else if (total_assigned > admited_worker_count && admited_worker_count != 0) { // 意味着某些 dfo 理论上一个线程都分不到 ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH; + LOG_USER_ERROR(OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH, total_assigned); LOG_WARN("total assigned worker to dfos is more than admited_worker_count", K(total_assigned), K(admited_worker_count), diff --git a/src/sql/engine/px/ob_px_admission.cpp b/src/sql/engine/px/ob_px_admission.cpp index b9e8be4789..02637912ff 100644 --- a/src/sql/engine/px/ob_px_admission.cpp +++ b/src/sql/engine/px/ob_px_admission.cpp @@ -34,7 +34,12 @@ int ObPxAdmission::get_parallel_session_target(ObSQLSessionInfo &session, omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); if (OB_FAIL(OB_PX_TARGET_MGR.get_parallel_servers_target(tenant_id, parallel_servers_target))) { LOG_WARN("get parallel_servers_target failed", K(ret)); + } else if (OB_UNLIKELY(minimal_session_target > parallel_servers_target)) { + ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH; + LOG_WARN("minimal_session_target is more than parallel_servers_target", K(ret), + K(minimal_session_target), K(parallel_servers_target)); } else if (OB_LIKELY(tenant_config.is_valid())) { + session_target = parallel_servers_target; int64_t pmas = tenant_config->_parallel_max_active_sessions; int64_t parallel_session_count; if (OB_FAIL(OB_PX_TARGET_MGR.get_parallel_session_count(tenant_id, parallel_session_count))) { @@ -44,10 +49,6 @@ int ObPxAdmission::get_parallel_session_target(ObSQLSessionInfo &session, // this is not good! We ensure this query can run with minimal threads here session_target = std::max(parallel_servers_target / pmas, minimal_session_target); } - } else if (OB_UNLIKELY(minimal_session_target > parallel_servers_target)) { - ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH; - LOG_WARN("minimal_session_target is more than parallel_servers_target", K(ret), - K(minimal_session_target), K(parallel_servers_target)); } else { // tenant_config is invalid, use parallel_servers_target session_target = parallel_servers_target; diff --git a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp index ac3ecbd928..0b0c0b7543 100644 --- a/src/sql/engine/px/ob_px_tenant_target_monitor.cpp +++ b/src/sql/engine/px/ob_px_tenant_target_monitor.cpp @@ -395,7 +395,7 @@ int ObPxTenantTargetMonitor::apply_target(hash::ObHashMap &work // read lock to avoid reset map. SpinRLockGuard rlock_guard(spin_lock_); // Just for avoid multiple SQL applications at the same time // for pmas - int64_t target = session_target != INT64_MAX ? session_target : parallel_servers_target_; + int64_t target = session_target; uint64_t version = version_; bool is_first_query = true; bool is_target_enough = true;