Fix dfo marked as is_single but dop is greater than 1

This commit is contained in:
obdev 2024-11-25 12:44:31 +00:00 committed by ob-robot
parent 00c31cb403
commit f74504d79a
5 changed files with 24 additions and 12 deletions

View File

@ -558,6 +558,7 @@ public:
void set_force_gen_local_plan() { force_local_plan_ = true; }
bool is_force_gen_local_plan() const { return force_local_plan_; }
void set_retry_info(const ObQueryRetryInfo *retry_info) { das_ctx_.get_location_router().set_retry_info(retry_info); }
bool is_use_adaptive_px_dop() const { return auto_dop_map_.size() > 0; }
private:
int build_temp_expr_ctx(const ObTempExpr &temp_expr, ObTempExprCtx *&temp_expr_ctx);

View File

@ -236,7 +236,8 @@ int ObDfoWorkerAssignment::calc_admited_worker_count(const ObIArray<ObDfo*> &dfo
int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
int64_t expected_worker_count,
int64_t minimal_worker_count,
int64_t admited_worker_count)
int64_t admited_worker_count,
bool use_adaptive_px_dop)
{
int ret = OB_SUCCESS;
/* 算法: */
@ -340,7 +341,8 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr,
if (OB_FAIL(ret)) {
} else if (OB_FAIL(get_dfos_worker_count(dfos, false, total_assigned))) {
LOG_WARN("failed to get dfos worker count", K(ret));
} else if (total_assigned > admited_worker_count && admited_worker_count != 0) {
} else if (!use_adaptive_px_dop && total_assigned > admited_worker_count
&& admited_worker_count != 0) {
// 意味着某些 dfo 理论上一个线程都分不到
ret = OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH;
LOG_USER_ERROR(OB_ERR_PARALLEL_SERVERS_TARGET_NOT_ENOUGH, total_assigned);
@ -454,8 +456,9 @@ int ObDfoMgr::init(ObExecContext &exec_ctx,
px_minimal,
px_admited))) {
LOG_WARN("fail to calc admited worler count", K(ret));
} else if (OB_FAIL(ObDfoWorkerAssignment::assign_worker(*this, px_expected, px_minimal, px_admited))) {
LOG_WARN("fail assign worker to dfos", K(ret), K(px_expected), K(px_minimal), K(px_admited));
} else if (OB_FAIL(ObDfoWorkerAssignment::assign_worker(
*this, px_expected, px_minimal, px_admited, exec_ctx.is_use_adaptive_px_dop()))) {
LOG_WARN("fail assign worker to dfos", K(ret), K(px_expected), K(px_minimal), K(px_admited));
} else {
inited_ = true;
}
@ -674,7 +677,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
// 修改成 is_local = true, dop = 1
dfo->set_coord_info_ptr(&px_coord_info);
dfo->set_single(transmit->is_px_single());
dfo->set_dop(get_adaptive_px_dop(*transmit, exec_ctx));
dfo->set_dop(transmit->is_px_single() ? transmit->get_px_dop() :
get_adaptive_px_dop(*transmit, exec_ctx));
dfo->set_qc_id(transmit->get_px_id());
dfo->set_dfo_id(transmit->get_dfo_id());
dfo->set_execution_id(exec_ctx.get_my_session()->get_current_execution_id());

View File

@ -100,7 +100,8 @@ public:
static int assign_worker(ObDfoMgr &dfo_mgr,
int64_t expected_worker_count,
int64_t minimal_worker_count,
int64_t admited_worker_count);
int64_t admited_worker_count,
bool use_adaptive_px_dop);
static int get_dfos_worker_count(const ObIArray<ObDfo*> &dfos,
const bool get_minimal,
int64_t &total_assigned);

View File

@ -40,7 +40,13 @@ int ObAdaptiveAutoDop::calculate_table_auto_dop(const ObPhysicalPlan &plan, Auto
} else if (OB_FAIL(map.create(common::hash::cal_next_prime(256), attr, attr))) {
LOG_WARN("create hash map failed", K(ret));
}
OZ(inner_calculate_table_auto_dop(*root_spec, map, table_dop, is_single_part));
if (OB_SUCC(ret)
&& OB_FAIL(inner_calculate_table_auto_dop(*root_spec, map, table_dop, is_single_part))) {
LOG_WARN("failed to inner calculate table auto dop", K(ret));
}
if (OB_FAIL(ret)) {
map.clear();
}
return ret;
}

View File

@ -1931,8 +1931,8 @@ int ObSqlPlanSet::try_get_local_plan(ObPlanCacheCtx &pc_ctx,
LOG_DEBUG("not local plan", K(real_type));
plan = NULL;
get_next = true;
} else if (GCONF._enable_adaptive_auto_dop && is_single_table_ && !is_contain_inner_table_
&& !plan->stat_.is_inner_) {
} else if (GCONF._enable_adaptive_auto_dop && plan->get_is_use_auto_dop() && is_single_table_
&& !is_contain_inner_table_ && !plan->stat_.is_inner_) {
int64_t dop = -1;
bool is_single_part = false;
ObAdaptiveAutoDop adaptive_auto_dop(exec_ctx);
@ -1999,8 +1999,8 @@ int ObSqlPlanSet::try_get_dist_plan(ObPlanCacheCtx &pc_ctx,
LOG_TRACE("failed to get dist plan", K(ret));
} else if (plan != NULL) {
LOG_TRACE("succeed to get dist plan", K(*plan));
if (GCONF._enable_adaptive_auto_dop && is_single_table_ && !is_contain_inner_table_
&& !plan->stat_.is_inner_) {
if (GCONF._enable_adaptive_auto_dop && plan->get_is_use_auto_dop() && is_single_table_
&& !is_contain_inner_table_ && !plan->stat_.is_inner_) {
int64_t dop = -1;
bool is_single_part = false;
ObAdaptiveAutoDop adaptive_auto_dop(exec_ctx);
@ -2009,7 +2009,7 @@ int ObSqlPlanSet::try_get_dist_plan(ObPlanCacheCtx &pc_ctx,
LOG_WARN("failed to calculate table auto dop", K(ret));
} else if (OB_FAIL(auto_dop_map.get_refactored(0, dop))) {
LOG_WARN("failed to get refactored", K(ret));
} else if (is_single_part && plan->get_is_use_auto_dop() && !pc_ctx.exist_local_plan_ && dop <= 1) {
} else if (is_single_part && !pc_ctx.exist_local_plan_ && dop <= 1) {
plan = NULL;
exec_ctx.set_force_gen_local_plan();
}