PATCH bugfix to opensource branch

This commit is contained in:
obdev
2021-07-19 22:33:13 +08:00
committed by wangzelin.wzl
parent 5b5c04ff49
commit e03cb03357
34 changed files with 1471 additions and 1150 deletions

View File

@ -6207,10 +6207,19 @@ int ObLogicalOperator::allocate_granule_nodes_above(AllocGIContext& ctx)
} else if (OB_ISNULL(get_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Get unexpected null", K(ret), K(get_plan()));
} else if (LOG_TABLE_SCAN != get_type() && LOG_JOIN != get_type() && LOG_SET != get_type() &&
LOG_GROUP_BY != get_type() && LOG_DISTINCT != get_type() && LOG_SUBPLAN_FILTER != get_type() &&
LOG_WINDOW_FUNCTION != get_type() && LOG_UPDATE != get_type() && LOG_DELETE != get_type() &&
LOG_INSERT != get_type() && LOG_MERGE != get_type() && LOG_FOR_UPD != get_type()) {
} else if (LOG_TABLE_SCAN != get_type()
&& LOG_JOIN != get_type()
&& LOG_SET != get_type()
&& LOG_GROUP_BY != get_type()
&& LOG_DISTINCT != get_type()
&& LOG_LIMIT != get_type()
&& LOG_SUBPLAN_FILTER != get_type()
&& LOG_WINDOW_FUNCTION != get_type()
&& LOG_UPDATE != get_type()
&& LOG_DELETE != get_type()
&& LOG_INSERT != get_type()
&& LOG_MERGE != get_type()
&& LOG_FOR_UPD != get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Only special op can allocate a granule iterator", K(get_type()));
} else {
@ -6407,6 +6416,8 @@ int ObLogicalOperator::push_down_limit(AllocExchContext* ctx, ObRawExpr* limit_c
}
// push down limit expr
if (OB_SUCC(ret)) {
bool need_child_limit = true;
bool need_pws_limit = false;
if (OB_ISNULL(child = exchange_point->get_child(first_child))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(exchange_point), K(ret));
@ -6414,10 +6425,21 @@ int ObLogicalOperator::push_down_limit(AllocExchContext* ctx, ObRawExpr* limit_c
!is_virtual_table(static_cast<ObLogTableScan*>(child)->get_ref_table_id()) &&
NULL == static_cast<ObLogTableScan*>(child)->get_limit_expr()) {
// Do NOT allocate LIMIT operator, and push down limit onto table scan directly.
ObLogTableScan* table_scan = static_cast<ObLogTableScan*>(child);
ObLogTableScan *table_scan = static_cast<ObLogTableScan *>(child);
table_scan->set_limit_offset(new_limit_count_expr, NULL);
} else {
if (OB_FAIL(exchange_point->allocate_limit_below(first_child, new_limit_count_expr))) {
need_child_limit = true;
} else if (child->is_partition_wise()) {
ObLogicalOperator *exch = NULL;
if (ctx->exchange_allocated_ && OB_FAIL(child->find_first_recursive(LOG_EXCHANGE, exch))) {
LOG_WARN("failed to find first exchange", K(ret));
} else if (exch == NULL) {
need_pws_limit = true;
}
}
if (OB_SUCC(ret) && need_child_limit) {
if (OB_FAIL(exchange_point->allocate_limit_below(first_child,
new_limit_count_expr))) {
LOG_WARN("failed to allocte limit below", K(ret));
} else if (OB_ISNULL(child_limit = exchange_point->get_child(first_child))) {
ret = OB_ERR_UNEXPECTED;
@ -6433,6 +6455,27 @@ int ObLogicalOperator::push_down_limit(AllocExchContext* ctx, ObRawExpr* limit_c
child_limit->set_width(get_width());
}
}
if (OB_SUCC(ret) && need_pws_limit) {
ObLogicalOperator *pws_limit = NULL;
if (OB_FAIL(child_limit->allocate_limit_below(first_child,
new_limit_count_expr))) {
LOG_WARN("failed to allocte limit below", K(ret));
} else if (OB_ISNULL(pws_limit = child_limit->get_child(first_child))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected error", K(pws_limit), K(child), K(ret));
} else if (OB_FAIL(pws_limit->set_expected_ordering(get_expected_ordering()))) {
LOG_WARN("failed to set expected ordering", K(ret));
} else if (OB_FAIL(pws_limit->replace_generated_agg_expr(ctx->group_push_down_replaced_exprs_))) {
LOG_WARN("failed to replace agg expr", K(ret));
} else {
static_cast<ObLogLimit*>(pws_limit)->set_fetch_with_ties(is_fetch_with_ties);
pws_limit->set_card(get_card());
pws_limit->set_op_cost(get_op_cost());
pws_limit->set_width(get_width());
pws_limit->set_is_partition_wise(true);
}
}
}
}
return ret;