diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index b4c8ee961e..4c54f76166 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -4969,7 +4969,7 @@ int ObStaticEngineCG::generate_spec( } } if (OB_SUCC(ret)) { - spec.enable_das_batch_rescans_ = op.enable_das_batch_rescans(); + spec.enable_das_group_rescan_ = op.enable_das_group_rescan(); } return ret; } diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.cpp b/src/sql/engine/subquery/ob_subplan_filter_op.cpp index bc7c8ec0b6..772d001ba2 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.cpp +++ b/src/sql/engine/subquery/ob_subplan_filter_op.cpp @@ -376,7 +376,7 @@ ObSubPlanFilterSpec::ObSubPlanFilterSpec(ObIAllocator &alloc, const ObPhyOperato exec_param_array_(alloc), exec_param_idxs_inited_(false), enable_px_batch_rescans_(alloc), - enable_das_batch_rescans_(false), + enable_das_group_rescan_(false), filter_exprs_(alloc), output_exprs_(alloc), left_rescan_params_(alloc), @@ -393,7 +393,7 @@ OB_SERIALIZE_MEMBER((ObSubPlanFilterSpec, ObOpSpec), exec_param_array_, exec_param_idxs_inited_, enable_px_batch_rescans_, - enable_das_batch_rescans_, + enable_das_group_rescan_, filter_exprs_, output_exprs_, left_rescan_params_, @@ -492,7 +492,7 @@ int ObSubPlanFilterOp::rescan() } if (OB_SUCC(ret) && - (MY_SPEC.enable_das_batch_rescans_ || enable_left_px_batch_)) { + (MY_SPEC.enable_das_group_rescan_ || enable_left_px_batch_)) { left_rows_.reset(); left_rows_iter_.reset(); is_left_end_ = false; @@ -500,7 +500,7 @@ int ObSubPlanFilterOp::rescan() last_store_row_.reset(); } - if (OB_SUCC(ret) && MY_SPEC.enable_das_batch_rescans_) { + if (OB_SUCC(ret) && MY_SPEC.enable_das_group_rescan_) { //We do not need alloc memory again in rescan. //das_batch_params_.reset(); current_group_ = 0; @@ -514,7 +514,7 @@ int ObSubPlanFilterOp::rescan() brs_holder_.reset(); } - if (!MY_SPEC.enable_das_batch_rescans_) { + if (!MY_SPEC.enable_das_group_rescan_) { for (int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) { if (OB_FAIL(children_[i]->rescan())) { LOG_WARN("rescan child operator failed", K(ret), @@ -652,7 +652,7 @@ int ObSubPlanFilterOp::inner_open() } //BATCH SUBPLAN FILTER { - if (OB_SUCC(ret) && MY_SPEC.enable_das_batch_rescans_) { + if (OB_SUCC(ret) && MY_SPEC.enable_das_group_rescan_) { max_group_size_ = OB_MAX_BULK_JOIN_ROWS; if(OB_FAIL(alloc_das_batch_params(max_group_size_+MY_SPEC.max_batch_size_))) { LOG_WARN("Fail to alloc das batch params.", K(ret)); @@ -661,7 +661,7 @@ int ObSubPlanFilterOp::inner_open() //} BATCH SUBPLAN FILTER END //left_rows used by px_batch and das batch. if (OB_SUCC(ret) && - (enable_left_px_batch_ || MY_SPEC.enable_das_batch_rescans_) && + (enable_left_px_batch_ || MY_SPEC.enable_das_group_rescan_) && OB_ISNULL(last_store_row_mem_)) { ObSQLSessionInfo *session = ctx_.get_my_session(); uint64_t tenant_id =session->get_effective_tenant_id(); @@ -693,7 +693,7 @@ int ObSubPlanFilterOp::inner_close() { destroy_subplan_iters(); destroy_update_set_mem(); - if (MY_SPEC.enable_das_batch_rescans_) { + if (MY_SPEC.enable_das_group_rescan_) { das_batch_params_.reset(); } return OB_SUCCESS; @@ -726,14 +726,14 @@ int ObSubPlanFilterOp::handle_next_row() OZ(prepare_onetime_exprs()); } if (OB_FAIL(ret)) { - } else if (enable_left_px_batch_ || MY_SPEC.enable_das_batch_rescans_) { + } else if (enable_left_px_batch_ || MY_SPEC.enable_das_group_rescan_) { //DAS batch spf is conflict with PX batch spf - OB_ASSERT(!(enable_left_px_batch_ && MY_SPEC.enable_das_batch_rescans_)); + OB_ASSERT(!(enable_left_px_batch_ && MY_SPEC.enable_das_group_rescan_)); bool has_row = false; int batch_count = 0; - batch_count = MY_SPEC.enable_das_batch_rescans_ ? max_group_size_ : PX_RESCAN_BATCH_ROW_COUNT; + batch_count = MY_SPEC.enable_das_group_rescan_ ? max_group_size_ : PX_RESCAN_BATCH_ROW_COUNT; if (left_rows_iter_.is_valid() && left_rows_iter_.has_next()) { - if(MY_SPEC.enable_das_batch_rescans_) { + if(MY_SPEC.enable_das_group_rescan_) { //das batch branch //Consume the remaining batch data in left store. current_group_++; @@ -749,7 +749,7 @@ int ObSubPlanFilterOp::handle_next_row() if(enable_left_px_batch_) { batch_rescan_ctl_.reuse(); } - if (MY_SPEC.enable_das_batch_rescans_) { + if (MY_SPEC.enable_das_group_rescan_) { current_group_ = 0; //Always OB_SUCCESS in current implement. if(OB_FAIL(init_das_batch_params())) { @@ -788,7 +788,7 @@ int ObSubPlanFilterOp::handle_next_row() LOG_WARN("fail to add row", K(ret)); } else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true))) { LOG_WARN("fail to prepare rescan params", K(ret)); - } else if (MY_SPEC.enable_das_batch_rescans_ && OB_FAIL(deep_copy_dynamic_obj())) { + } else if (MY_SPEC.enable_das_group_rescan_ && OB_FAIL(deep_copy_dynamic_obj())) { LOG_WARN("fail to deep copy dynamic obj", K(ret)); } else { has_row = true; @@ -807,7 +807,7 @@ int ObSubPlanFilterOp::handle_next_row() ret = OB_SUCCESS; OZ(left_rows_.finish_add_row(false)); OZ(left_rows_.begin(left_rows_iter_)); - if (MY_SPEC.enable_das_batch_rescans_) { + if (MY_SPEC.enable_das_group_rescan_) { //Lazy batch rescan right iterator. //Just set the flag, do the rescan when call the iter->rewind(). for(int32_t i = 1; OB_SUCC(ret) && i < child_cnt_; ++i) { @@ -828,7 +828,7 @@ int ObSubPlanFilterOp::handle_next_row() OZ(fill_cur_row_rescan_param()); } else { //das batch spf branch - OB_ASSERT(MY_SPEC.enable_das_batch_rescans_); + OB_ASSERT(MY_SPEC.enable_das_group_rescan_); if (OB_FAIL(fill_cur_row_das_batch_param(eval_ctx_, current_group_))) { LOG_WARN("Filed to prepare das batch rescan params", K(ret)); } @@ -1129,7 +1129,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt) clear_evaluated_flag(); if(OB_FAIL(ret)) { LOG_WARN("prepare_onetime_expr fail.", K(ret)); - } else if (MY_SPEC.enable_das_batch_rescans_) { + } else if (MY_SPEC.enable_das_group_rescan_) { if (OB_FAIL(handle_next_batch_with_group_rescan(op_max_batch_size))) { LOG_WARN("handle_next_batch_with_group_rescan failed", K(ret)); } diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.h b/src/sql/engine/subquery/ob_subplan_filter_op.h index 4fcf1f9c80..eaba5045b9 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.h +++ b/src/sql/engine/subquery/ob_subplan_filter_op.h @@ -165,7 +165,7 @@ public: bool exec_param_idxs_inited_; // 标记每个子查询是否可以做px batch rescan common::ObFixedArray enable_px_batch_rescans_; - bool enable_das_batch_rescans_; + bool enable_das_group_rescan_; ExprFixedArray filter_exprs_; ExprFixedArray output_exprs_; common::ObFixedArray left_rescan_params_; @@ -208,7 +208,7 @@ public: int fill_cur_row_das_batch_param(ObEvalCtx& eval_ctx, uint64_t current_group) const; int bind_das_batch_params_to_store() const; void get_current_group(uint64_t& current_group) const; - bool enable_left_das_batch() const {return MY_SPEC.enable_das_batch_rescans_;} + bool enable_left_das_batch() const {return MY_SPEC.enable_das_group_rescan_;} //for DAS batch SPF end const ObSubPlanFilterSpec &get_spec() const diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 8c83cacad7..1040a5b194 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -11586,7 +11586,7 @@ int ObLogPlan::adjust_final_plan_info(ObLogicalOperator *&op) OB_FAIL(static_cast(op)->check_and_set_use_batch())) { LOG_WARN("failed to set use batch nlj", K(ret)); } else if (log_op_def::LOG_SUBPLAN_FILTER == op->get_type() && - OB_FAIL(static_cast(op)->check_and_set_use_batch())) { + OB_FAIL(static_cast(op)->check_and_set_das_group_rescan())) { LOG_WARN("failed to set use batch spf", K(ret)); } else { /*do nothing*/ } } diff --git a/src/sql/optimizer/ob_log_subplan_filter.cpp b/src/sql/optimizer/ob_log_subplan_filter.cpp index da100b7b34..e020cd619a 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.cpp +++ b/src/sql/optimizer/ob_log_subplan_filter.cpp @@ -183,7 +183,7 @@ int ObLogSubPlanFilter::get_plan_item_info(PlanText &plan_text, } else if (OB_FAIL(BUF_PRINTF(", "))) { LOG_WARN("BUF_PRINTF fails", K(ret)); } else if (OB_FAIL(BUF_PRINTF("use_batch=%s", - enable_das_batch_rescans_ ? "true" : "false"))) { + enable_das_group_rescan_ ? "true" : "false"))) { LOG_WARN("BUF_PRINTF fails", K(ret)); } else { /* Do nothing */ } END_BUF_PRINT(plan_item.special_predicates_, @@ -441,8 +441,8 @@ int ObLogSubPlanFilter::compute_sharding_info() return ret; } -int ObLogSubPlanFilter::check_if_match_das_batch_rescan(ObLogicalOperator *root, - bool &enable_das_batch_rescans) +int ObLogSubPlanFilter::check_if_match_das_group_rescan(ObLogicalOperator *root, + bool &group_rescan) { int ret = OB_SUCCESS; if (OB_ISNULL(root)) { @@ -450,26 +450,52 @@ int ObLogSubPlanFilter::check_if_match_das_batch_rescan(ObLogicalOperator *root, LOG_WARN("unexpected null", K(ret)); } else if (root->is_table_scan()) { bool is_valid = false; - ObLogTableScan *tsc = static_cast(root); - if (!tsc->use_das()) { - enable_das_batch_rescans = false; - } else if (OB_FAIL(ObOptimizerUtil::check_contribute_query_range(root, - get_exec_params(), - is_valid))) { - LOG_WARN("failed to check query range contribution", K(ret)); - } else if (!is_valid) { - enable_das_batch_rescans = false; - } else if (tsc->get_scan_direction() != default_asc_direction()) { - enable_das_batch_rescans = false; - } else if (tsc->has_index_scan_filter() && tsc->get_index_back() && tsc->get_is_index_global()) { - // For the global index lookup, if there is a pushdown filter when scanning the index, - // batch cannot be used. - enable_das_batch_rescans = false; - } else {/*do nothing*/} + ObLogTableScan *tsc = NULL; + ObLogPlan *plan = NULL; + const AccessPath *ap = NULL; + const TableItem *table_item = NULL; + if (OB_ISNULL(tsc = static_cast(root)) + // tsc might belong to a different subquery + // with its own plan + || OB_ISNULL(plan = tsc->get_plan()) + || OB_ISNULL(ap = tsc->get_access_path()) + || OB_ISNULL(table_item = plan->get_stmt()->get_table_item_by_id(ap->table_id_))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (!tsc->use_das()) { + group_rescan = false; + } + if (OB_SUCC(ret) && group_rescan) { + group_rescan = !(is_virtual_table(ap->ref_table_id_) + || table_item->is_link_table() + || ap->is_cte_path() + || ap->is_function_table_path() + || ap->is_temp_table_path() + || ap->is_json_table_path() + || table_item->for_update_ + || !ap->subquery_exprs_.empty() + || EXTERNAL_TABLE == table_item->table_type_ + ); + } + if (OB_SUCC(ret) && group_rescan) { + if (OB_FAIL(ObOptimizerUtil::check_contribute_query_range(root, + get_exec_params(), + is_valid))) { + LOG_WARN("failed to check query range contribution", K(ret)); + } else if (!is_valid) { + group_rescan = false; + } else if (tsc->get_scan_direction() != default_asc_direction()) { + group_rescan = false; + } else if (tsc->has_index_scan_filter() && tsc->get_index_back() && tsc->get_is_index_global()) { + // For the global index lookup, if there is a pushdown filter when scanning the index, + // batch cannot be used. + group_rescan = false; + } else {/*do nothing*/} + } } else if (root->get_num_of_child() == 1) { if (OB_SUCC(ret)) { - if (OB_FAIL(SMART_CALL(check_if_match_das_batch_rescan(root->get_child(0), - enable_das_batch_rescans)))) { + if (OB_FAIL(SMART_CALL(check_if_match_das_group_rescan(root->get_child(0), + group_rescan)))) { LOG_WARN("failed to check match das batch rescan", K(ret)); } } @@ -486,7 +512,7 @@ int ObLogSubPlanFilter::set_use_das_batch(ObLogicalOperator* root) } else if (root->is_table_scan()) { ObLogTableScan *ts = static_cast(root); if (!ts->get_range_conditions().empty()) { - ts->set_use_batch(enable_das_batch_rescans_); + ts->set_use_batch(enable_das_group_rescan_); } } else if (root->get_num_of_child() == 1) { if(OB_FAIL(SMART_CALL(set_use_das_batch(root->get_child(first_child))))) { @@ -496,7 +522,7 @@ int ObLogSubPlanFilter::set_use_das_batch(ObLogicalOperator* root) return ret; } -int ObLogSubPlanFilter::check_and_set_use_batch() +int ObLogSubPlanFilter::check_and_set_das_group_rescan() { int ret = OB_SUCCESS; ObSQLSessionInfo *session_info = NULL; @@ -505,28 +531,28 @@ int ObLogSubPlanFilter::check_and_set_use_batch() || OB_ISNULL(session_info = plan->get_optimizer_context().get_session_info())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null", K(ret)); - } else if (OB_FAIL(session_info->get_nlj_batching_enabled(enable_das_batch_rescans_))) { + } else if (OB_FAIL(session_info->get_nlj_batching_enabled(enable_das_group_rescan_))) { LOG_WARN("failed to get enable batch variable", K(ret)); } // check use batch - for (int64_t i = 1; OB_SUCC(ret) && enable_das_batch_rescans_ && i < get_num_of_child(); i++) { + for (int64_t i = 1; OB_SUCC(ret) && enable_das_group_rescan_ && i < get_num_of_child(); i++) { ObLogicalOperator *child = get_child(i); bool contains_invalid_startup = false; if (OB_ISNULL(child)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null", K(ret)); } else if (get_initplan_idxs().has_member(i) || get_onetime_idxs().has_member(i)) { - enable_das_batch_rescans_ = false; + enable_das_group_rescan_ = false; } else if (!(child->get_type() == log_op_def::LOG_TABLE_SCAN || child->get_type() == log_op_def::LOG_SUBPLAN_SCAN)) { - enable_das_batch_rescans_ = false; - } else if (OB_FAIL(check_if_match_das_batch_rescan(child, enable_das_batch_rescans_))) { + enable_das_group_rescan_ = false; + } else if (OB_FAIL(check_if_match_das_group_rescan(child, enable_das_group_rescan_))) { LOG_WARN("failed to check match das batch rescan", K(ret)); - } else if (enable_das_batch_rescans_) { + } else if (enable_das_group_rescan_) { if (OB_FAIL(plan->contains_startup_with_exec_param(child, contains_invalid_startup))) { LOG_WARN("failed to check contains invalid startup", K(ret)); } else if (contains_invalid_startup) { - enable_das_batch_rescans_ = false; + enable_das_group_rescan_ = false; } } } @@ -540,7 +566,7 @@ int ObLogSubPlanFilter::check_and_set_use_batch() LOG_WARN("failed to set use das batch rescan", K(ret)); } } - LOG_TRACE("spf das batch rescan", K(ret), K(enable_das_batch_rescans_)); + LOG_TRACE("spf das batch rescan", K(ret), K(enable_das_group_rescan_)); return ret; } diff --git a/src/sql/optimizer/ob_log_subplan_filter.h b/src/sql/optimizer/ob_log_subplan_filter.h index 8ee4d9c7c1..659c7a400b 100644 --- a/src/sql/optimizer/ob_log_subplan_filter.h +++ b/src/sql/optimizer/ob_log_subplan_filter.h @@ -32,7 +32,7 @@ public: init_plan_idxs_(), one_time_idxs_(), update_set_(false), - enable_das_batch_rescans_(false) + enable_das_group_rescan_(false) {} ~ObLogSubPlanFilter() {} virtual int est_cost() override; @@ -101,11 +101,10 @@ public: int add_px_batch_rescan_flag(bool flag) { return enable_px_batch_rescans_.push_back(flag); } common::ObIArray &get_px_batch_rescans() { return enable_px_batch_rescans_; } - inline bool enable_das_batch_rescans() { return enable_das_batch_rescans_; } - inline void set_enable_das_batch_rescans(bool flag) { enable_das_batch_rescans_ = flag; } - int check_and_set_use_batch(); - int check_if_match_das_batch_rescan(ObLogicalOperator *root, - bool &enable_das_batch_rescans); + inline bool enable_das_group_rescan() { return enable_das_group_rescan_; } + inline void set_enable_das_group_rescan(bool flag) { enable_das_group_rescan_ = flag; } + int check_and_set_das_group_rescan(); + int check_if_match_das_group_rescan(ObLogicalOperator *root, bool &group_rescan); int set_use_das_batch(ObLogicalOperator* root); int allocate_startup_expr_post() override; @@ -154,7 +153,7 @@ protected: common::ObSEArray above_pushdown_left_params_; common::ObSEArray above_pushdown_right_params_; - bool enable_das_batch_rescans_; + bool enable_das_group_rescan_; private: DISALLOW_COPY_AND_ASSIGN(ObLogSubPlanFilter); };