Fix temp table related bugs

This commit is contained in:
xianyu-w
2023-08-04 10:18:34 +00:00
committed by ob-robot
parent 558f74a2d6
commit 88430a9d96
30 changed files with 2750 additions and 1064 deletions

View File

@ -25,6 +25,7 @@
#include "sql/engine/cmd/ob_table_direct_insert_service.h"
#include "sql/dblink/ob_dblink_utils.h"
#include "sql/resolver/dml/ob_merge_stmt.h"
#include "sql/optimizer/ob_log_temp_table_insert.h"
using namespace oceanbase;
using namespace sql;
using namespace oceanbase::common;
@ -114,11 +115,74 @@ int ObOptimizer::get_optimization_cost(ObDMLStmt &stmt,
return ret;
}
int ObOptimizer::get_cte_optimization_cost(ObDMLStmt &root_stmt,
ObSelectStmt *cte_query,
ObIArray<ObSelectStmt *> &stmts,
double &cte_cost,
ObIArray<double> &costs)
{
int ret = OB_SUCCESS;
cte_cost = 0.0;
ctx_.set_cost_evaluation();
costs.reuse();
if (OB_ISNULL(ctx_.get_query_ctx()) ||
OB_ISNULL(ctx_.get_session_info())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("query ctx or session info is null", K(ret));
} else if (OB_FAIL(init_env_info(root_stmt))) {
LOG_WARN("failed to init env info", K(ret));
} else if (OB_FAIL(generate_plan_for_temp_table(root_stmt))) {
LOG_WARN("failed to generate plan for temp table", K(ret));
}
if (OB_SUCC(ret) && NULL != cte_query) {
bool find = false;
for (int64_t i = 0; OB_SUCC(ret) && !find && i < ctx_.get_temp_table_infos().count(); i ++) {
ObSqlTempTableInfo *temp_table_info = ctx_.get_temp_table_infos().at(i);
ObLogPlan *plan = NULL;
if (OB_ISNULL(temp_table_info) || OB_ISNULL(temp_table_info->table_plan_)
|| OB_ISNULL(plan = temp_table_info->table_plan_->get_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (cte_query == temp_table_info->table_query_) {
find = true;
if (OB_FAIL(plan->allocate_temp_table_insert_as_top(temp_table_info->table_plan_,
temp_table_info))) {
LOG_WARN("failed to allocate temp table insert", K(ret));
} else {
cte_cost = temp_table_info->table_plan_->get_cost();
}
}
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < stmts.count(); i ++) {
ObLogPlan *plan = NULL;
ObSelectStmt *stmt = stmts.at(i);
ObLogicalOperator *best_plan = NULL;
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (OB_ISNULL(plan = ctx_.get_log_plan_factory().create(ctx_, *stmt))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("failed to create plan", "stmt", ctx_.get_query_ctx()->get_sql_stmt(), K(ret));
} else if (OB_FAIL(plan->generate_raw_plan())) {
LOG_WARN("failed to perform optimization", K(ret));
} else if (OB_FAIL(plan->init_candidate_plans(plan->get_candidate_plans().candidate_plans_))) {
LOG_WARN("failed to do candi into", K(ret));
} else if (OB_FAIL(plan->get_candidate_plans().get_best_plan(best_plan))) {
LOG_WARN("failed to get best plan", K(ret));
} else if (OB_FAIL(costs.push_back(best_plan->get_cost()))) {
LOG_WARN("failed to push back", K(ret));
}
}
return ret;
}
int ObOptimizer::generate_plan_for_temp_table(ObDMLStmt &stmt)
{
int ret = OB_SUCCESS;
ObIArray<ObSqlTempTableInfo*> &temp_table_infos = ctx_.get_temp_table_infos();
if (OB_FAIL(collect_temp_tables(ctx_.get_allocator(), stmt, temp_table_infos))) {
if (OB_FAIL(ObSqlTempTableInfo::collect_temp_tables(ctx_.get_allocator(), stmt, temp_table_infos,
ctx_.get_query_ctx(), true))) {
LOG_WARN("failed to add all temp tables", K(ret));
} else if (temp_table_infos.empty()) {
//do nothing
@ -128,6 +192,9 @@ int ObOptimizer::generate_plan_for_temp_table(ObDMLStmt &stmt)
ObSelectLogPlan *temp_plan = NULL;
ObLogicalOperator *temp_op = NULL;
for (int64_t i = 0; OB_SUCC(ret) && i < temp_table_infos.count(); i++) {
ObRawExpr *temp_table_nonwhere_filter = NULL;
ObRawExpr *temp_table_where_filter = NULL;
bool can_push_to_where = true;
if (OB_ISNULL(temp_table_info = temp_table_infos.at(i)) ||
OB_ISNULL(ref_query = temp_table_info->table_query_)) {
ret = OB_ERR_UNEXPECTED;
@ -141,6 +208,13 @@ int ObOptimizer::generate_plan_for_temp_table(ObDMLStmt &stmt)
OPT_TRACE_TITLE("begin generate plan for temp table ", temp_table_info->table_name_);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(try_push_down_temp_table_filter(*temp_table_info,
temp_table_nonwhere_filter,
temp_table_where_filter))) {
LOG_WARN("failed to push down filter for temp table", K(ret));
} else if (NULL != temp_table_where_filter &&
OB_FAIL(temp_plan->get_pushdown_filters().push_back(temp_table_where_filter))) {
LOG_WARN("failed to push down filter", K(ret));
} else if (OB_FAIL(temp_plan->generate_raw_plan())) {
LOG_WARN("Failed to generate temp_plan for sub_stmt", K(ret));
} else if (OB_FAIL(temp_plan->get_candidate_plans().get_best_plan(temp_op))) {
@ -148,6 +222,9 @@ int ObOptimizer::generate_plan_for_temp_table(ObDMLStmt &stmt)
} else if (OB_ISNULL(temp_op)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (NULL != temp_table_nonwhere_filter &&
OB_FAIL(temp_op->get_filter_exprs().push_back(temp_table_nonwhere_filter))) {
LOG_WARN("failed to push back", K(ret));
} else {
temp_table_info->table_plan_ = temp_op;
OPT_TRACE_TITLE("end generate plan for temp table ", temp_table_info->table_name_);
@ -157,72 +234,131 @@ int ObOptimizer::generate_plan_for_temp_table(ObDMLStmt &stmt)
return ret;
}
int ObOptimizer::collect_temp_tables(ObIAllocator &allocator,
ObDMLStmt &stmt,
ObIArray<ObSqlTempTableInfo*> &temp_table_infos)
/**
* If every appearance of cte is accompanied by some filter,
* We can combine these filters to reduce the data materialized by cte.
* We try to push these filters to where condition.
* If all filters can be push to where condition, nonwhere_filter will be NULL.
* Otherwise, we might have both where_filter and nonwhere_filter.
* e.g.
* with cte as (select a,count(*) as cnt from t1 group by a)
* select * from cte where a = 1 and cnt = 1 union all select * from cte where a = 2 and cnt = 2;
* nonwhere_filter : (a = 1 and cnt = 1) or (a = 2 and cnt = 2)
* where_filter : (a = 1) or (a = 2)
*/
int ObOptimizer::try_push_down_temp_table_filter(ObSqlTempTableInfo &info,
ObRawExpr *&nonwhere_filter,
ObRawExpr *&where_filter)
{
int ret = OB_SUCCESS;
ObSqlTempTableInfo *temp_table_info = NULL;
void *ptr = NULL;
TableItem *table = NULL;
ObSEArray<ObSelectStmt*, 4> child_stmts;
if (OB_ISNULL(ctx_.get_query_ctx())) {
bool have_filter = true;
nonwhere_filter = NULL;
where_filter = NULL;
ObSEArray<ObIArray<ObRawExpr *> *, 4> temp_table_filters;
ObSEArray<const ObDMLStmt *, 4> parent_stmts;
ObSEArray<const ObSelectStmt *, 4> subqueries;
ObSEArray<int64_t, 4> table_ids;
for (int64_t i = 0; OB_SUCC(ret) && have_filter && i < info.table_infos_.count(); ++i) {
TableItem *table = info.table_infos_.at(i).table_item_;
ObDMLStmt *stmt = info.table_infos_.at(i).upper_stmt_;
if (OB_ISNULL(table) || OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret));
} else if (info.table_infos_.at(i).table_filters_.empty()) {
have_filter = false;
} else if (OB_FAIL(temp_table_filters.push_back(&info.table_infos_.at(i).table_filters_))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(parent_stmts.push_back(stmt))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(subqueries.push_back(table->ref_query_))) {
LOG_WARN("failed to push back", K(ret));
} else if (OB_FAIL(table_ids.push_back(table->table_id_))) {
LOG_WARN("failed to push back", K(ret));
}
}
if (OB_SUCC(ret) && have_filter) {
OPT_TRACE("pushdown filter into temp table:", info.table_query_);
bool can_push_all = false;
if (OB_FAIL(ObOptimizerUtil::split_or_filter_into_subquery(parent_stmts,
subqueries,
table_ids,
temp_table_filters,
ctx_,
where_filter,
can_push_all,
/*check_match_index = */false))) {
LOG_WARN("failed to split filter", K(ret));
} else if (can_push_all) {
// do nothing
} else if (OB_FAIL(push_down_temp_table_filter(info, nonwhere_filter))) {
LOG_WARN("failed to push down remain temp table filter", K(ret));
}
if (NULL != where_filter) {
OPT_TRACE("succeed to pushdown filter to where:", where_filter);
}
if (NULL != nonwhere_filter) {
OPT_TRACE("succeed to pushdown filter into the top of temp table:", nonwhere_filter);
}
}
return ret;
}
int ObOptimizer::push_down_temp_table_filter(ObSqlTempTableInfo &info,
ObRawExpr *&temp_table_filter)
{
int ret = OB_SUCCESS;
ObRawExprFactory &expr_factory = ctx_.get_expr_factory();
ObSQLSessionInfo *session_info = NULL;
temp_table_filter = NULL;
ObSEArray<ObRawExpr *, 8> and_exprs;
ObRawExpr *or_expr = NULL;
bool have_temp_table_filter = true;
if (OB_ISNULL(session_info = ctx_.get_session_info()) ||
OB_ISNULL(info.table_query_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(stmt.get_child_stmts(child_stmts))) {
LOG_WARN("failed to get child stmts", K(ret));
LOG_WARN("unexpect null param", K(session_info), K(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < child_stmts.count(); i++) {
if (OB_ISNULL(child_stmts.at(i))) {
for (int64_t i = 0; OB_SUCC(ret) && have_temp_table_filter && i < info.table_infos_.count(); ++i) {
have_temp_table_filter &= !info.table_infos_.at(i).table_filters_.empty();
}
for (int64_t i = 0; OB_SUCC(ret) && have_temp_table_filter && i < info.table_infos_.count(); ++i) {
ObDMLStmt *upper_stmt = info.table_infos_.at(i).upper_stmt_;
TableItem *table = info.table_infos_.at(i).table_item_;
ObIArray<ObRawExpr *> &table_filters = info.table_infos_.at(i).table_filters_;
ObSEArray<ObRawExpr *, 8> rename_exprs;
ObRawExpr *and_expr = NULL;
if (table_filters.empty() || OB_ISNULL(upper_stmt) ||
OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(SMART_CALL(collect_temp_tables(allocator, *child_stmts.at(i),
temp_table_infos)))) {
LOG_WARN("failed to add all temp tables", K(ret));
LOG_WARN("unexpect null table info", K(ret));
} else if (OB_FAIL(ObOptimizerUtil::rename_pushdown_filter(*upper_stmt,
*info.table_query_,
table->table_id_,
session_info,
expr_factory,
table_filters,
rename_exprs))) {
LOG_WARN("failed to rename push down preds", K(ret));
} else if (OB_FAIL(ObRawExprUtils::build_and_expr(expr_factory,
rename_exprs,
and_expr))) {
LOG_WARN("failed to build and expr", K(ret));
}
if (OB_SUCC(ret) && OB_FAIL(and_exprs.push_back(and_expr))) {
LOG_WARN("failed to push back expr", K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < stmt.get_table_items().count(); i++) {
bool find = true;
if (OB_ISNULL(table = stmt.get_table_items().at(i))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (!table->is_temp_table()) {
//do nothing
} else {
ObIArray<ObSqlTempTableInfo*> &temp_table_infos_ = ctx_.get_temp_table_infos();
find = false;
for (int64_t j = 0; OB_SUCC(ret) && !find && j < temp_table_infos_.count(); j++) {
ObSqlTempTableInfo* info = temp_table_infos_.at(j);
if (OB_ISNULL(info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null info", K(ret));
} else if (info->table_query_ == table->ref_query_) {
find = true;
table->ref_id_ = info->temp_table_id_;
}
}
}
if (OB_SUCC(ret) && !find) {
if (OB_ISNULL(table->ref_query_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(SMART_CALL(collect_temp_tables(allocator, *table->ref_query_,
temp_table_infos)))) {
LOG_WARN("failed to add all temp tables", K(ret));
} else if (OB_ISNULL(ptr = allocator.alloc(sizeof(ObSqlTempTableInfo)))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else {
temp_table_info = new (ptr) ObSqlTempTableInfo();
table->ref_id_ = ctx_.get_query_ctx()->available_tb_id_--;
temp_table_info->temp_table_id_ = table->ref_id_;
temp_table_info->table_name_ = table->table_name_;
temp_table_info->table_query_ = table->ref_query_;
if (OB_FAIL(temp_table_infos.push_back(temp_table_info))) {
LOG_WARN("failed to push back", K(ret));
}
}
}
if (OB_FAIL(ret) || !have_temp_table_filter) {
} else if (OB_FAIL(ObRawExprUtils::build_or_exprs(expr_factory,
and_exprs,
or_expr))) {
LOG_WARN("failed to build or expr", K(ret));
} else if (OB_FAIL(or_expr->formalize(session_info))) {
LOG_WARN("failed to formalize expr", K(ret));
} else if (OB_FAIL(or_expr->pull_relation_id())) {
LOG_WARN("failed to pull relation id and levels", K(ret));
} else {
temp_table_filter = or_expr;
}
return ret;
}