fix late materialization bug

This commit is contained in:
zl0
2021-09-24 19:19:08 +08:00
committed by wangzelin.wzl
parent bda3d4e6c5
commit 16b086a718
36 changed files with 496 additions and 201 deletions

View File

@ -1534,6 +1534,8 @@ int ObLogicalOperator::do_pre_traverse_operation(const TraverseOp& op, void* ctx
}
break;
}
case ALLOC_STARTUP_EXPR:
break;
default: {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected access of default branch", K(op), K(ret));
@ -1920,6 +1922,12 @@ int ObLogicalOperator::do_post_traverse_operation(const TraverseOp& op, void* ct
}
break;
}
case ALLOC_STARTUP_EXPR: {
if (OB_FAIL(allocate_startup_expr_post())) {
LOG_WARN("failed to alloc startup expr post", K(ret));
}
break;
}
default:
break;
}
@ -6704,6 +6712,74 @@ int ObLogicalOperator::generate_link_sql_pre(GenLinkStmtContext& link_ctx)
return OB_SUCCESS;
}
int ObLogicalOperator::allocate_startup_expr_post()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < get_num_of_child(); ++i) {
if (OB_FAIL(allocate_startup_expr_post(i))) {
LOG_WARN("failed to allocate startup expr post", K(i), K(ret));
}
}
return ret;
}
int ObLogicalOperator::allocate_startup_expr_post(int64_t child_idx)
{
int ret = OB_SUCCESS;
ObLogicalOperator *child = get_child(child_idx);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child", K(ret));
} else if (is_dml_operator() ||
log_op_def::LOG_TEMP_TABLE_INSERT == get_type()) {
//do nothing
} else if (child->get_startup_exprs().empty()) {
//do nothing
} else {
ObSEArray<ObRawExpr*, 4> non_startup_exprs, new_startup_exprs;
ObIArray<ObRawExpr*> &startup_exprs = child->get_startup_exprs();
for (int64_t i = 0; OB_SUCC(ret) && i < startup_exprs.count(); ++i) {
if (OB_ISNULL(startup_exprs.at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null expr", K(ret));
} else if (startup_exprs.at(i)->has_flag(CNT_ROWNUM) ||
startup_exprs.at(i)->has_flag(CNT_EXEC_PARAM)) {
if (OB_FAIL(non_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
} else if (OB_FAIL(new_startup_exprs.push_back(startup_exprs.at(i)))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
if (OB_SUCC(ret)) {
if (get_startup_exprs().empty() &&
OB_FAIL(add_startup_exprs(new_startup_exprs))) {
LOG_WARN("failed to add startup exprs", K(ret));
} else {
bool mark_exchange_out = false;
if (log_op_def::LOG_EXCHANGE == child->get_type()) {
ObLogExchange *exchange_out = static_cast<ObLogExchange*>(child);
if (exchange_out->is_px_producer()) {
if (log_op_def::LOG_EXCHANGE == get_type()) {
ObLogExchange *exchange_in = static_cast<ObLogExchange*>(this);
if (!exchange_in->is_rescanable()) {
mark_exchange_out = true;
}
}
}
}
if (!mark_exchange_out) {
if (OB_FAIL(child->get_startup_exprs().assign(non_startup_exprs))) {
LOG_WARN("failed to assign exprs", K(ret));
}
}
}
}
}
return ret;
}
int ObLogicalOperator::allocate_link_node_above(int64_t child_idx)
{
int ret = OB_SUCCESS;