fix mv rewrite on multi part mview
This commit is contained in:
parent
7e3d97f4bd
commit
324322d3dc
@ -43,6 +43,30 @@ int ObTransformMVRewrite::transform_one_stmt(common::ObIArray<ObParentDMLStmt> &
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObTransformMVRewrite::~ObTransformMVRewrite() {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(ctx_) || OB_ISNULL(ctx_->stmt_factory_)) {
|
||||
// do nothing
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < mv_infos_.count(); ++i) {
|
||||
if (OB_ISNULL(mv_infos_.at(i).view_stmt_)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObTransformUtils::free_stmt(*ctx_->stmt_factory_, mv_infos_.at(i).view_stmt_))) {
|
||||
LOG_WARN("failed to free stmt", K(ret));
|
||||
} else {
|
||||
mv_infos_.at(i).view_stmt_ = NULL;
|
||||
}
|
||||
if (OB_FAIL(ret) || OB_ISNULL(mv_infos_.at(i).select_mv_stmt_)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObTransformUtils::free_stmt(*ctx_->stmt_factory_, mv_infos_.at(i).select_mv_stmt_))) {
|
||||
LOG_WARN("failed to free stmt", K(ret));
|
||||
} else {
|
||||
mv_infos_.at(i).select_mv_stmt_ = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObTransformMVRewrite::need_transform(const common::ObIArray<ObParentDMLStmt> &parent_stmts,
|
||||
const int64_t current_level,
|
||||
const ObDMLStmt &stmt,
|
||||
@ -473,7 +497,7 @@ int ObTransformMVRewrite::generate_mv_stmt(MvInfo &mv_info)
|
||||
resolver_ctx.stmt_factory_ = ctx_->stmt_factory_;
|
||||
resolver_ctx.sql_proxy_ = GCTX.sql_proxy_;
|
||||
resolver_ctx.query_ctx_ = &mv_temp_query_ctx_;
|
||||
resolver_ctx.is_for_rt_mv_ = true;
|
||||
// resolver_ctx.is_for_rt_mv_ = true;
|
||||
trans_ctx = *ctx_;
|
||||
trans_ctx.reset();
|
||||
ObSelectResolver select_resolver(resolver_ctx);
|
||||
@ -515,8 +539,75 @@ int ObTransformMVRewrite::generate_mv_stmt(MvInfo &mv_info)
|
||||
} else {
|
||||
LOG_DEBUG("generate mv stmt", KPC(mv_info.view_stmt_));
|
||||
}
|
||||
|
||||
// resolve "select 1 from mv" for mv with multi partitions
|
||||
bool has_multi_part = false;
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(check_mv_has_multi_part(mv_info, has_multi_part))) {
|
||||
LOG_WARN("failed to check mv has multi part", K(ret));
|
||||
} else if (has_multi_part) {
|
||||
ObParser mv_sql_parser(*ctx_->allocator_, ctx_->session_info_->get_sql_mode(), ctx_->session_info_->get_charsets4parser());
|
||||
ObSelectResolver mv_sql_select_resolver(resolver_ctx);
|
||||
ObSqlString mv_sql; // select 1 from mv;
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(mv_sql.assign_fmt(lib::is_oracle_mode() ?
|
||||
"SELECT /*+no_mv_rewrite*/ 1 FROM \"%.*s\".\"%.*s\"" :
|
||||
"SELECT /*+no_mv_rewrite*/ 1 FROM `%.*s`.`%.*s`",
|
||||
mv_info.db_schema_->get_database_name_str().length(), mv_info.db_schema_->get_database_name_str().ptr(),
|
||||
mv_info.mv_schema_->get_table_name_str().length(), mv_info.mv_schema_->get_table_name_str().ptr()))) {
|
||||
LOG_WARN("failed to assign sql", K(ret));
|
||||
} else if (OB_FAIL(mv_sql_parser.parse(ObString::make_string(mv_sql.ptr()), parse_result))) {
|
||||
LOG_WARN("parse view definition failed", K(mv_sql), K(ret));
|
||||
} else if (OB_ISNULL(node = parse_result.result_tree_->children_[0]) || OB_UNLIKELY(T_SELECT != node->type_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid mv select node", K(ret), K(node), K(node->type_));
|
||||
} else if (OB_FALSE_IT(resolver_ctx.query_ctx_->question_marks_count_
|
||||
= static_cast<int64_t>(parse_result.question_mark_ctx_.count_))) {
|
||||
} else if (OB_FAIL(mv_sql_select_resolver.resolve(*node))) {
|
||||
LOG_WARN("resolve view definition failed", K(ret));
|
||||
} else if (OB_ISNULL(mv_info.select_mv_stmt_ = static_cast<ObSelectStmt*>(mv_sql_select_resolver.get_basic_stmt()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid mv stmt", K(ret), K(mv_info.select_mv_stmt_));
|
||||
}
|
||||
} else {
|
||||
mv_info.select_mv_stmt_ = NULL;
|
||||
}
|
||||
|
||||
RESUME_OPT_TRACE;
|
||||
OPT_TRACE(mv_info.mv_schema_->get_table_name(), ":", view_stmt);
|
||||
OPT_TRACE(mv_info.mv_schema_->get_table_name(), ":", mv_info.view_stmt_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTransformMVRewrite::check_mv_has_multi_part(const MvInfo &mv_info,
|
||||
bool &has_multi_part)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
has_multi_part = false;
|
||||
ObSEArray<ObAuxTableMetaInfo, 8> simple_index_infos;
|
||||
if (OB_ISNULL(ctx_) || OB_ISNULL(ctx_->schema_checker_)
|
||||
|| OB_ISNULL(ctx_->session_info_) || OB_ISNULL(mv_info.data_table_schema_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(ret), K(ctx_), K(mv_info));
|
||||
} else if (mv_info.data_table_schema_->get_part_level() != PARTITION_LEVEL_ZERO) {
|
||||
has_multi_part = true;
|
||||
} else if (OB_FAIL(mv_info.data_table_schema_->get_simple_index_infos(simple_index_infos))) {
|
||||
LOG_WARN("failed to get simple index infos", K(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && !has_multi_part && i < simple_index_infos.count(); ++i) {
|
||||
const ObTableSchema *index_schema = NULL;
|
||||
if (OB_FAIL(ctx_->schema_checker_->get_table_schema(ctx_->session_info_->get_effective_tenant_id(), simple_index_infos.at(i).table_id_, index_schema))) {
|
||||
LOG_WARN("get index schema from schema checker failed", K(ret), K(simple_index_infos.at(i).table_id_));
|
||||
} else if (OB_ISNULL(index_schema)) {
|
||||
ret = OB_TABLE_NOT_EXIST;
|
||||
LOG_WARN("index table not exists", K(simple_index_infos.at(i).table_id_));
|
||||
} else if (index_schema->is_final_invalid_index() || !index_schema->is_global_index_table()) {
|
||||
//do nothing
|
||||
} else if (index_schema->get_part_level() != PARTITION_LEVEL_ZERO) {
|
||||
has_multi_part = true;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -705,10 +796,16 @@ int ObTransformMVRewrite::do_transform_use_one_mv(ObSelectStmt *origin_stmt,
|
||||
LOG_WARN("failed to expand real time mv", K(ret));
|
||||
} else if (OB_FAIL(helper.new_stmt_->formalize_stmt(ctx_->session_info_))) {
|
||||
LOG_WARN("failed to formalize stmt info", K(ret));
|
||||
} else {
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (is_valid) {
|
||||
new_stmt = helper.new_stmt_;
|
||||
is_valid_transform = true;
|
||||
OPT_TRACE("generate rewrite stmt use", mv_info.mv_schema_->get_table_name(), ":", new_stmt);
|
||||
} else if (OB_FAIL(ObTransformUtils::free_stmt(*ctx_->stmt_factory_, helper.new_stmt_))) {
|
||||
LOG_WARN("failed to free stmt", K(ret));
|
||||
} else {
|
||||
helper.new_stmt_ = NULL;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -869,6 +966,45 @@ int ObTransformMVRewrite::create_mv_column_item(const MvInfo &mv_info,
|
||||
LOG_WARN("failed to add replaced expr", K(ret), KPC(mv_info.view_stmt_->get_select_item(i).expr_), KPC(col_expr));
|
||||
}
|
||||
}
|
||||
// fill part expr
|
||||
if (OB_SUCC(ret) && NULL != mv_info.select_mv_stmt_
|
||||
&& mv_info.select_mv_stmt_->get_part_exprs().count() > 0) {
|
||||
ObRawExprCopier part_expr_copier(*ctx_->expr_factory_); // FROM select_mv_stmt_ TO new_stmt_
|
||||
ObSEArray<ObColumnRefRawExpr*, 4> mv_col_exprs; // column expr in select_mv_stmt_
|
||||
// make part_expr_copier which maps select_mv_stmt_ columns to new_stmt_ columns
|
||||
if (OB_FAIL(mv_info.select_mv_stmt_->get_column_exprs(mv_col_exprs))) {
|
||||
LOG_WARN("failed to get column exprs", K(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < mv_col_exprs.count(); ++i) {
|
||||
ObColumnRefRawExpr *col_expr = NULL; // column expr in new_stmt_
|
||||
if (OB_ISNULL(mv_col_exprs.at(i))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("column expr is null", K(ret), K(i));
|
||||
} else if (OB_ISNULL(col_expr = helper.new_stmt_->get_column_expr_by_id(helper.mv_item_->table_id_, mv_col_exprs.at(i)->get_column_id()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get column id", K(ret), K(helper.mv_item_->table_id_), K(i));
|
||||
} else if (OB_FAIL(part_expr_copier.add_replaced_expr(mv_col_exprs.at(i), col_expr))) {
|
||||
LOG_WARN("failed to add replaced expr", K(ret), KPC(mv_col_exprs.at(i)), KPC(col_expr));
|
||||
}
|
||||
}
|
||||
// do fill part expr for new_stmt_
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < mv_info.select_mv_stmt_->get_part_exprs().count(); ++i) {
|
||||
ObDMLStmt::PartExprItem &part_item = mv_info.select_mv_stmt_->get_part_exprs().at(i);
|
||||
ObRawExpr *part_expr = NULL;
|
||||
ObRawExpr *subpart_expr = NULL;
|
||||
if (OB_FAIL(part_expr_copier.copy_on_replace(part_item.part_expr_, part_expr))) {
|
||||
LOG_WARN("failed to copy part expr", K(ret), K(part_item));
|
||||
} else if (NULL != part_item.subpart_expr_
|
||||
&& OB_FAIL(part_expr_copier.copy_on_replace(part_item.subpart_expr_, subpart_expr))) {
|
||||
LOG_WARN("failed to copy subpart expr", K(ret), K(part_item));
|
||||
} else if (OB_FAIL(helper.new_stmt_->set_part_expr(helper.mv_item_->table_id_,
|
||||
part_item.index_tid_,
|
||||
part_expr,
|
||||
subpart_expr))) {
|
||||
LOG_WARN("set part expr to new stmt failed", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ public:
|
||||
: ObTransformRule(ctx, TransMethod::PRE_ORDER, T_MV_REWRITE),
|
||||
is_mv_info_generated_(false),
|
||||
mv_stmt_gen_count_(0) {}
|
||||
virtual ~ObTransformMVRewrite() {}
|
||||
virtual ~ObTransformMVRewrite();
|
||||
virtual int transform_one_stmt(common::ObIArray<ObParentDMLStmt> &parent_stmts,
|
||||
ObDMLStmt *&stmt,
|
||||
bool &trans_happened) override;
|
||||
@ -77,7 +77,8 @@ private:
|
||||
mv_schema_(NULL),
|
||||
data_table_schema_(NULL),
|
||||
db_schema_(NULL),
|
||||
view_stmt_(NULL) {}
|
||||
view_stmt_(NULL),
|
||||
select_mv_stmt_(NULL) {}
|
||||
|
||||
MvInfo(uint64_t mv_id,
|
||||
uint64_t data_table_id,
|
||||
@ -90,7 +91,8 @@ private:
|
||||
mv_schema_(mv_schema),
|
||||
data_table_schema_(data_table_schema),
|
||||
db_schema_(db_schema),
|
||||
view_stmt_(view_stmt) {}
|
||||
view_stmt_(view_stmt),
|
||||
select_mv_stmt_(NULL) {}
|
||||
|
||||
TO_STRING_KV(
|
||||
K_(mv_id),
|
||||
@ -98,7 +100,8 @@ private:
|
||||
K_(mv_schema),
|
||||
K_(data_table_schema),
|
||||
K_(db_schema),
|
||||
K_(view_stmt)
|
||||
K_(view_stmt),
|
||||
K_(select_mv_stmt)
|
||||
);
|
||||
|
||||
uint64_t mv_id_;
|
||||
@ -107,6 +110,7 @@ private:
|
||||
const ObTableSchema *data_table_schema_;
|
||||
const ObDatabaseSchema *db_schema_;
|
||||
ObSelectStmt *view_stmt_;
|
||||
ObSelectStmt *select_mv_stmt_;
|
||||
};
|
||||
|
||||
struct GenerateStmtHelper {
|
||||
@ -208,6 +212,9 @@ private:
|
||||
bool &is_match_index);
|
||||
int add_param_constraint(const ObStmtMapInfo &map_info);
|
||||
|
||||
int check_mv_has_multi_part(const MvInfo &mv_info,
|
||||
bool &has_multi_part);
|
||||
|
||||
private:
|
||||
ObSEArray<MvInfo, 4> mv_infos_;
|
||||
bool is_mv_info_generated_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user