optimize calculation of px expected worker count

This commit is contained in:
sdc
2024-02-28 03:44:38 +00:00
committed by ob-robot
parent 8cf87984db
commit ba233cf3c1
10 changed files with 578 additions and 119 deletions

View File

@ -433,7 +433,9 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan)
need_late_materialization_(false),
op_exprs_(),
inherit_sharding_index_(-1),
need_osg_merge_(false)
need_osg_merge_(false),
max_px_thread_branch_(OB_INVALID_INDEX),
max_px_group_branch_(OB_INVALID_INDEX)
{
}
@ -5961,3 +5963,191 @@ int ObLogicalOperator::alloc_nodes_above(AllocOpContext& ctx, const uint64_t &fl
}
return ret;
}
int ObLogicalOperator::open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = NULL;
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_ISNULL(get_child(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("child op is null", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (is_block_op()) {
// block operator is usually single-child, so it doesn't matter whether consume child 1by1 actually.
if (is_consume_child_1by1()) {
// open and close child one by one.
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
child = get_child(i);
if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("open px resource analyze failed", K(ret));
} else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("close px resource analyze failed", K(ret));
}
}
} else {
// open all then close all
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_FAIL(SMART_CALL(get_child(i)->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("open px resource analyze failed", K(ret));
}
}
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_FAIL(SMART_CALL(get_child(i)->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("close px resource analyze failed", K(ret));
}
}
}
} else if (is_consume_child_1by1()) {
// open and close all block input. for example: first child of HASH JOIN, HASH SET(except UNION)
int64_t child_idx = 0;
for (; child_idx < get_num_of_child() && is_block_input(child_idx) && OB_SUCC(ret); child_idx++) {
child = get_child(child_idx);
if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("open px resource analyze failed", K(ret));
} else if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("close px resource analyze failed", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(find_max_px_resource_child(OPEN_PX_RESOURCE_ANALYZE_ARG, child_idx))) {
LOG_WARN("find max px resource child failed", K(ret));
} else {
for (; child_idx < get_num_of_child() && OB_SUCC(ret); child_idx++) {
if (child_idx == max_px_thread_branch_ || child_idx == max_px_group_branch_) {
// skip, open later
} else if (OB_FAIL(SMART_CALL(get_child(child_idx)->open_px_resource_analyze(
OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
} else if (OB_FAIL(SMART_CALL(get_child(child_idx)->close_px_resource_analyze(
CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child close px resource analyze failed", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(OB_INVALID_INDEX == max_px_thread_branch_ ||
OB_INVALID_INDEX == max_px_group_branch_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("child with max parallel thread/group not found", K(ret));
} else if (max_px_thread_branch_ >= get_num_of_child()) {
// all inputs are block, do nothing.
} else if (OB_FAIL(SMART_CALL(get_child(max_px_thread_branch_)->open_px_resource_analyze(
OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
// open both child with max thread and child with max group. may be inaccurate, by design.
} else if (max_px_group_branch_ != max_px_thread_branch_ &&
OB_FAIL(SMART_CALL(get_child(max_px_group_branch_)->open_px_resource_analyze(
OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
}
}
} else {
// non block op and not consume child one by one, open all children. example: nlj, merge union distinct.
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_FAIL(SMART_CALL(get_child(i)->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("open px resource analyze failed", K(ret));
}
}
}
return ret;
}
int ObLogicalOperator::close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_DECLARE_ARG)
{
int ret = OB_SUCCESS;
// close children that are opened and not closed in open_px_resource_analyze.
ObLogicalOperator *child = NULL;
if (is_block_op()) {
// do nothing because all children have been closed.
} else if (is_consume_child_1by1()) {
if (max_px_thread_branch_ >= get_num_of_child()) {
// all children are block input and have been closed, do nothing.
} else if (OB_FAIL(SMART_CALL(get_child(max_px_thread_branch_)->close_px_resource_analyze(
CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
// close both child with max thread and child with max group.
} else if (max_px_group_branch_ != max_px_thread_branch_ &&
OB_FAIL(SMART_CALL(get_child(max_px_group_branch_)->close_px_resource_analyze(
CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
}
} else {
for (int64_t i = 0; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_FAIL(SMART_CALL(get_child(i)->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("open px resource analyze failed", K(ret));
}
}
}
return ret;
}
/* search for child with max running thread/group count.
* NESTEDLOOP JOIN
* UNION ALL PX4(dop=5)
* HASH JOIN PX3(dop=5)
* PX1(dop=10) PX2(dop=2)
* When nlj is outputting data, both union-all and PX4 are opened, the data of UNION-ALL may be from
* HASH JOIN or PX3, we need to know when the px work count reaches the maximum.
* When the data of UNION-ALL is from HASH JOIN, the px worker count of the plan equals to PX2 + PX4 = 7.
* When the data of UNION-ALL is from PX3, the px worker count of the plan equals to PX3 + PX4 = 10.
* Consider that before UNION-ALL output data, PX1 has to be opened and closed,
* so the final expected_px_worker_cnt = max(PX1, max(PX2 + PX4, PX3 + PX4) = 10.
* This function is to search for the child of UNION-ALL kept in open state when UNION-ALL is open.
* In the above plan, the result is 1.
*/
int ObLogicalOperator::find_max_px_resource_child(OPEN_PX_RESOURCE_ANALYZE_DECLARE_ARG,
int64_t first_nonblock_child)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(first_nonblock_child >= get_num_of_child() - 1)) {
max_px_thread_branch_ = first_nonblock_child;
max_px_group_branch_ = first_nonblock_child;
LOG_TRACE("[PxResAnaly] find max px resource child", K(get_op_id()), K(max_px_thread_branch_),
K(max_px_group_branch_));
} else if (OB_INVALID_INDEX == max_px_thread_branch_) {
int64_t ori_thread_cnt = cur_parallel_thread_count;
int64_t ori_group_cnt = cur_parallel_group_count;
int64_t max_child_thread_cnt = -1;
int64_t max_child_group_cnt = -1;
ObLogicalOperator *child = NULL;
bool append_map = false;
for (int64_t i = first_nonblock_child; i < get_num_of_child() && OB_SUCC(ret); i++) {
if (OB_ISNULL(child = get_child(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("child is null", K(ret));
} else if (OB_FAIL(SMART_CALL(child->open_px_resource_analyze(OPEN_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child open px resource analyze failed", K(ret));
} else {
int64_t thread_inc = cur_parallel_thread_count - ori_thread_cnt;
int64_t group_inc = cur_parallel_group_count - ori_group_cnt;
if (thread_inc > max_child_thread_cnt) {
max_child_thread_cnt = thread_inc;
max_px_thread_branch_ = i;
if (group_inc == max_child_group_cnt) {
// make max_px_group_branch_ be equal to max_px_thread_branch_ if possible.
max_px_group_branch_ = i;
}
}
if (group_inc > max_child_group_cnt) {
max_child_group_cnt = group_inc;
max_px_group_branch_ = i;
if (thread_inc == max_child_thread_cnt) {
max_px_thread_branch_ = i;
}
}
if (OB_FAIL(SMART_CALL(child->close_px_resource_analyze(CLOSE_PX_RESOURCE_ANALYZE_ARG)))) {
LOG_WARN("child close px resource analyze failed", K(ret));
} else {
OB_ASSERT(cur_parallel_thread_count == ori_thread_cnt);
OB_ASSERT(cur_parallel_group_count == ori_group_cnt);
}
}
}
LOG_TRACE("[PxResAnaly] find max px resource child", K(get_op_id()), K(max_px_thread_branch_),
K(max_px_group_branch_));
}
return ret;
}