[CP] implement preserve order for pagination

This commit is contained in:
zzg19950727 2024-03-21 09:46:05 +00:00 committed by ob-robot
parent 1d0a5c685e
commit 7956384fd3
8 changed files with 306 additions and 4 deletions

View File

@ -1863,6 +1863,10 @@ DEF_BOOL(enable_rpc_authentication_bypass, OB_CLUSTER_PARAMETER, "True",
DEF_INT(_checkpoint_diagnose_preservation_count, OB_TENANT_PARAMETER, "100", "[0,800]",
"the count of checkpoint diagnose info preservation",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_preserve_order_for_pagination, OB_TENANT_PARAMETER, "False",
"enable preserver order for limit",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(max_partition_num, OB_TENANT_PARAMETER, "8192", "[8192, 65536]",
"set max partition num in mysql mode",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -818,6 +818,11 @@ bool ObOptParamHint::is_param_val_valid(const OptParamType param_type, const ObO
|| 0 == val.get_varchar().case_compare("false"));
break;
}
case PRESERVE_ORDER_FOR_PAGINATION: {
is_valid = val.is_varchar() && (0 == val.get_varchar().case_compare("true")
|| 0 == val.get_varchar().case_compare("false"));
break;
}
default:
LOG_TRACE("invalid opt param val", K(param_type), K(val));
break;

View File

@ -108,6 +108,7 @@ struct ObOptParamHint
DEF(WORKAREA_SIZE_POLICY,) \
DEF(ENABLE_RICH_VECTOR_FORMAT,) \
DEF(_ENABLE_STORAGE_CARDINALITY_ESTIMATION,) \
DEF(PRESERVE_ORDER_FOR_PAGINATION,) \
DECLARE_ENUM(OptParamType, opt_param, OPT_PARAM_TYPE_DEF, static);

View File

@ -60,6 +60,7 @@ int ObTransformPreProcess::transform_one_stmt(common::ObIArray<ObParentDMLStmt>
int ret = OB_SUCCESS;
trans_happened = false;
bool is_happened = false;
ObDMLStmt *limit_stmt = NULL;
if (OB_ISNULL(stmt) || OB_ISNULL(ctx_) || OB_ISNULL(ctx_->allocator_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("unexpected NULL", K(ret), K(stmt), K(ctx_));
@ -261,7 +262,7 @@ int ObTransformPreProcess::transform_one_stmt(common::ObIArray<ObParentDMLStmt>
}
if (OB_SUCC(ret)) {
if (OB_FAIL(transform_rownum_as_limit_offset(parent_stmts, stmt, is_happened))) {
if (OB_FAIL(transform_rownum_as_limit_offset(parent_stmts, stmt, limit_stmt, is_happened))) {
LOG_WARN("failed to transform rownum as limit", K(ret));
} else {
trans_happened |= is_happened;
@ -269,6 +270,17 @@ int ObTransformPreProcess::transform_one_stmt(common::ObIArray<ObParentDMLStmt>
LOG_TRACE("succeed to transform rownum as limit", K(is_happened));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(preserve_order_for_pagination(NULL == limit_stmt ? stmt : limit_stmt, is_happened))) {
LOG_WARN("failed to preserve order for pagination", K(ret));
} else {
trans_happened |= is_happened;
OPT_TRACE("preserve order for pagination:", is_happened);
LOG_TRACE("succeed to preserve order for pagination", K(is_happened));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(transform_groupingsets_rollup_cube(stmt, is_happened))) {
LOG_WARN("failed to transform for transform for grouping sets, rollup and cube.", K(ret));
@ -6309,6 +6321,7 @@ int ObTransformPreProcess::transformer_aggr_expr(ObDMLStmt *stmt,
int ObTransformPreProcess::transform_rownum_as_limit_offset(
const ObIArray<ObParentDMLStmt> &parent_stmts,
ObDMLStmt *&stmt,
ObDMLStmt *&limit_stmt,
bool &trans_happened)
{
int ret = OB_SUCCESS;
@ -6316,7 +6329,7 @@ int ObTransformPreProcess::transform_rownum_as_limit_offset(
bool is_rownum_happened = false;
bool is_generated_rownum_happened = false;
trans_happened = false;
if (OB_FAIL(transform_common_rownum_as_limit(stmt, is_rownum_happened))) {
if (OB_FAIL(transform_common_rownum_as_limit(stmt, limit_stmt, is_rownum_happened))) {
LOG_WARN("failed to transform common rownum as limit", K(ret));
} else if (OB_FAIL(transform_generated_rownum_as_limit(parent_stmts, stmt,
is_generated_rownum_happened))) {
@ -6334,7 +6347,9 @@ int ObTransformPreProcess::transform_rownum_as_limit_offset(
* => select * from (select * from t where ... limit ?) order by c1;
*
**/
int ObTransformPreProcess::transform_common_rownum_as_limit(ObDMLStmt *&stmt, bool &trans_happened)
int ObTransformPreProcess::transform_common_rownum_as_limit(ObDMLStmt *&stmt,
ObDMLStmt *&limit_stmt,
bool &trans_happened)
{
int ret = OB_SUCCESS;
trans_happened = false;
@ -6367,6 +6382,7 @@ int ObTransformPreProcess::transform_common_rownum_as_limit(ObDMLStmt *&stmt, bo
LOG_WARN("get unexpected null", K(ret), K(child_stmt));
} else {
child_stmt->set_limit_offset(limit_expr, NULL);
limit_stmt = child_stmt;
}
return ret;
}
@ -10489,5 +10505,247 @@ int ObTransformPreProcess::check_can_transform_insert_only_merge_into(const ObMe
return ret;
}
int ObTransformPreProcess::preserve_order_for_pagination(ObDMLStmt *stmt,
bool &trans_happened)
{
int ret = OB_SUCCESS;
trans_happened = false;
bool is_valid = false;
ObSEArray<ObSelectStmt*, 2> preserve_order_stmts;
if (OB_FAIL(check_stmt_need_preserve_order(stmt,
preserve_order_stmts,
is_valid))) {
LOG_WARN("failed to check stmt need add order by", K(ret));
} else if (!is_valid) {
//do nothing
} else {
bool happened = false;
for (int64_t i = 0; OB_SUCC(ret) && i < preserve_order_stmts.count(); ++i) {
if (OB_FAIL(add_order_by_for_stmt(preserve_order_stmts.at(i), happened))) {
LOG_WARN("failed to add order by for stmt", K(ret));
} else {
trans_happened |= happened;
}
}
}
return ret;
}
int ObTransformPreProcess::check_stmt_need_preserve_order(ObDMLStmt *stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &is_valid)
{
int ret = OB_SUCCESS;
is_valid = false;
bool is_hint_enabled = false;
bool has_hint = false;
ObSelectStmt *sel_stmt = NULL;
if (OB_ISNULL(stmt) || OB_ISNULL(ctx_) ||
OB_ISNULL(ctx_->session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null", K(ret), K(stmt), K(ctx_));
} else if (lib::is_oracle_mode()) {
OPT_TRACE("oracle tenant will not preserve order for pagination");
} else if (OB_FAIL(stmt->get_query_ctx()->get_global_hint().opt_params_.get_bool_opt_param(
ObOptParamHint::PRESERVE_ORDER_FOR_PAGINATION, is_hint_enabled, has_hint))) {
LOG_WARN("failed to check has opt param", K(ret));
} else if (has_hint && !is_hint_enabled) {
OPT_TRACE("query hint disable preserve order for pagination");
} else if (OB_FAIL(ctx_->session_info_->is_preserve_order_for_pagination_enabled(is_valid))) {
LOG_WARN("failed to check preserve order for pagination enabled", K(ret));
} else if (!is_valid && !has_hint) {
OPT_TRACE("system config disable preserve order for pagination");
} else if (!stmt->is_select_stmt()) {
OPT_TRACE("dml query can not preserve order for pagination");
} else if (OB_FALSE_IT(sel_stmt=static_cast<ObSelectStmt*>(stmt))) {
} else if (!sel_stmt->has_limit() || NULL != sel_stmt->get_limit_percent_expr()) {
OPT_TRACE("query do not have normal limit offset");
} else if (OB_FALSE_IT(is_valid = true)) {
} else if (sel_stmt->has_order_by() ||
sel_stmt->has_group_by() ||
sel_stmt->has_distinct() ||
sel_stmt->get_aggr_item_size() != 0 ||
sel_stmt->has_window_function() ||
1 != sel_stmt->get_table_items().count()) {
if (OB_FAIL(preserve_order_stmts.push_back(sel_stmt))) {
LOG_WARN("failed to push back stmt", K(ret));
}
} else {
TableItem *table = sel_stmt->get_table_items().at(0);
ObSEArray<ObSelectStmt*, 2> view_preserve_order_stmts;
bool need_preserve = false;
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null table", K(ret));
} else if (!table->is_generated_table()) {
if (OB_FAIL(preserve_order_stmts.push_back(sel_stmt))) {
LOG_WARN("failed to push back stmt", K(ret));
}
} else if (OB_FAIL(check_view_need_preserve_order(table->ref_query_,
view_preserve_order_stmts,
need_preserve))) {
LOG_WARN("failed to check view need preserve_order", K(ret));
} else if (need_preserve &&
OB_FAIL(append(preserve_order_stmts, view_preserve_order_stmts))) {
LOG_WARN("failed to append stmt", K(ret));
} else if (!need_preserve &&
OB_FAIL(preserve_order_stmts.push_back(sel_stmt))) {
LOG_WARN("failed to push back stmt", K(ret));
}
}
return ret;
}
int ObTransformPreProcess::check_view_need_preserve_order(ObSelectStmt* stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &need_preserve)
{
int ret = OB_SUCCESS;
need_preserve = false;
if (OB_ISNULL(stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null stmt", K(ret));
} else if (stmt->has_order_by() && !stmt->has_limit()) {
need_preserve = true;
if (OB_FAIL(preserve_order_stmts.push_back(stmt))) {
LOG_WARN("failed to push back stmt", K(ret));
}
} else if (!stmt->has_order_by() &&
!stmt->has_limit() &&
stmt->is_set_stmt() &&
OB_FAIL(check_set_stmt_need_preserve_order(stmt,
preserve_order_stmts,
need_preserve))) {
LOG_WARN("failed to check set stmt preserve order", K(ret));
} else if (!stmt->is_spj()) {
//do nothing
} else if (1 != stmt->get_table_items().count()) {
//do nothing
} else {
TableItem *table = stmt->get_table_items().at(0);
if (OB_ISNULL(table)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null table", K(ret));
} else if (!table->is_generated_table()) {
//do nothing
} else if (OB_FAIL(SMART_CALL(check_view_need_preserve_order(table->ref_query_,
preserve_order_stmts,
need_preserve)))) {
LOG_WARN("failed to check view need preserve order", K(ret));
}
}
return ret;
}
int ObTransformPreProcess::check_set_stmt_need_preserve_order(ObSelectStmt* stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &need_preserve)
{
int ret = OB_SUCCESS;
need_preserve = false;
bool force_serial_set_order = false;
if (OB_ISNULL(stmt) || OB_ISNULL(ctx_) ||
OB_ISNULL(ctx_->session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null stmt", K(ret));
} else if (OB_FAIL(ctx_->session_info_->is_serial_set_order_forced(force_serial_set_order,
lib::is_oracle_mode()))) {
LOG_WARN("fail to get force_serial_set_order value", K(ret));
} else if (!force_serial_set_order) {
//do nothing
} else if (!stmt->is_set_stmt() ||
stmt->is_set_distinct() ||
stmt->is_recursive_union()) {
//do nothing
} else {
need_preserve = true;
int64_t N = stmt->get_set_query().count();
for (int64_t i = 0; OB_SUCC(ret) && need_preserve && i < N; ++i) {
ObSelectStmt *set_query = stmt->get_set_query().at(i);
if (OB_ISNULL(set_query)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null stmt", K(ret));
} else if (OB_FAIL(SMART_CALL(check_view_need_preserve_order(set_query,
preserve_order_stmts,
need_preserve)))) {
LOG_WARN("failed to check view need preserve order", K(ret));
}
}
}
return ret;
}
int ObTransformPreProcess::add_order_by_for_stmt(ObSelectStmt* stmt, bool &trans_happened)
{
int ret = OB_SUCCESS;
bool is_valid = false;
ObSEArray<ObRawExpr*, 2> select_exprs;
ObSEArray<ObRawExpr*, 2> order_by_exprs;
ObSEArray<ObRawExpr*, 2> new_order_by_exprs;
if (OB_ISNULL(stmt) || OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null stmt", K(ret));
} else if (OB_FAIL(stmt->get_order_exprs(order_by_exprs))) {
LOG_WARN("failed to get order exprs", K(ret));
} else if (OB_FAIL(ObTransformUtils::check_stmt_unique(stmt,
ctx_->session_info_,
ctx_->schema_checker_,
order_by_exprs,
true,
is_valid))) {
LOG_WARN("failed to check stmt unique on exprs", K(ret));
} else if (is_valid) {
OPT_TRACE("current order by exprs is unique, do not need add extra order by exprs");
} else if (OB_FAIL(get_rowkey_for_single_table(stmt, select_exprs, is_valid))) {
LOG_WARN("failed to get rowkey for single table", K(ret));
} else if (!is_valid &&
OB_FAIL(stmt->get_select_exprs_without_lob(select_exprs))) {
LOG_WARN("failed to get select exprs", K(ret));
} else if (OB_FAIL(ObOptimizerUtil::except_exprs(select_exprs,
order_by_exprs,
new_order_by_exprs))) {
LOG_WARN("failed to except exprs", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < new_order_by_exprs.count(); ++i) {
OrderItem item(new_order_by_exprs.at(i));
if (OB_FAIL(stmt->add_order_item(item))) {
LOG_WARN("failed to add order item", K(ret));
}
}
if (OB_SUCC(ret) && !new_order_by_exprs.empty()) {
trans_happened = true;
}
}
return ret;
}
int ObTransformPreProcess::get_rowkey_for_single_table(ObSelectStmt* stmt,
ObIArray<ObRawExpr*> &unique_keys,
bool &is_valid)
{
int ret = OB_SUCCESS;
is_valid = false;
TableItem *table = NULL;
if (OB_ISNULL(stmt) || OB_ISNULL(ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null stmt", K(ret));
} else if (1 != stmt->get_table_items().count()) {
//do nothing
} else if (OB_ISNULL(table=stmt->get_table_items().at(0))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null table item", K(ret));
} else if (!table->is_basic_table()) {
//do nothing
} else if (OB_FAIL(ObTransformUtils::generate_unique_key_for_basic_table(ctx_,
stmt,
table,
unique_keys))) {
LOG_WARN("failed to generate unique key", K(ret));
} else {
is_valid = true;
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -460,8 +460,9 @@ struct DistinctObjMeta
int transformer_aggr_expr(ObDMLStmt *stmt, bool &trans_happened);
int transform_rownum_as_limit_offset(const ObIArray<ObParentDMLStmt> &parent_stmts,
ObDMLStmt *&stmt,
ObDMLStmt *&limit_stmt,
bool &trans_happened);
int transform_common_rownum_as_limit(ObDMLStmt *&stmt, bool &trans_happened);
int transform_common_rownum_as_limit(ObDMLStmt *&stmt, ObDMLStmt *&limit_stmt, bool &trans_happened);
int try_transform_common_rownum_as_limit_or_false(ObDMLStmt *stmt, ObRawExpr *&limit_expr, bool& is_valid);
int transform_generated_rownum_as_limit(const ObIArray<ObParentDMLStmt> &parent_stmts,
ObDMLStmt *stmt,
@ -654,6 +655,25 @@ struct DistinctObjMeta
int flatten_conditions(ObDMLStmt *stmt, bool &trans_happened);
int recursive_flatten_join_conditions(ObDMLStmt *stmt, TableItem *table, bool &trans_happened);
int do_flatten_conditions(ObDMLStmt *stmt, ObIArray<ObRawExpr*> &conditions, bool &trans_happened);
int preserve_order_for_pagination(ObDMLStmt *stmt,
bool &trans_happened);
int check_stmt_need_preserve_order(ObDMLStmt *stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &is_valid);
int check_view_need_preserve_order(ObSelectStmt* stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &need_preserve);
int check_set_stmt_need_preserve_order(ObSelectStmt* stmt,
ObIArray<ObSelectStmt*> &preserve_order_stmts,
bool &need_preserve);
int add_order_by_for_stmt(ObSelectStmt* stmt, bool &trans_happened);
int get_rowkey_for_single_table(ObSelectStmt* stmt,
ObIArray<ObRawExpr*> &unique_keys,
bool &is_valid);
private:
DISALLOW_COPY_AND_ASSIGN(ObTransformPreProcess);
};

View File

@ -510,6 +510,18 @@ int ObSQLSessionInfo::is_better_inlist_enabled(bool &enabled) const
return ret;
}
int ObSQLSessionInfo::is_preserve_order_for_pagination_enabled(bool &enabled) const
{
int ret = OB_SUCCESS;
enabled = false;
int64_t tenant_id = get_effective_tenant_id();
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (tenant_config.is_valid()) {
enabled = tenant_config->_preserve_order_for_pagination;
}
return ret;
}
bool ObSQLSessionInfo::is_index_skip_scan_enabled() const
{
bool bret = false;

View File

@ -1210,6 +1210,7 @@ public:
bool is_var_assign_use_das_enabled() const;
int is_adj_index_cost_enabled(bool &enabled, int64_t &stats_cost_percent) const;
bool is_spf_mlj_group_rescan_enabled() const;
int is_preserve_order_for_pagination_enabled(bool &enabled) const;
ObSessionDDLInfo &get_ddl_info() { return ddl_info_; }
void set_ddl_info(const ObSessionDDLInfo &ddl_info) { ddl_info_ = ddl_info; }

View File

@ -384,6 +384,7 @@ _parallel_redo_logging_trigger
_parallel_server_sleep_time
_pdml_thread_cache_size
_pipelined_table_function_memory_limit
_preserve_order_for_pagination
_print_sample_ppm
_private_buffer_size
_publish_schema_mode