From c67ffdd3c7d0fa33eecd1c6a5bee894ba0ac1515 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 18 Oct 2024 09:13:39 +0000 Subject: [PATCH] [CP] Adaptive auto dop based on storage layer estimation --- src/share/parameter/ob_parameter_seed.ipp | 3 + src/sql/CMakeLists.txt | 1 + .../code_generator/ob_static_engine_cg.cpp | 2 + src/sql/engine/ob_exec_context.cpp | 5 +- src/sql/engine/ob_exec_context.h | 6 + src/sql/engine/ob_physical_plan.h | 2 + .../px/exchange/ob_px_fifo_coord_op.cpp | 2 +- .../engine/px/exchange/ob_px_ms_coord_op.cpp | 2 +- .../px/exchange/ob_px_ms_coord_vec_op.cpp | 2 +- .../px/exchange/ob_px_ordered_coord_op.cpp | 2 +- src/sql/engine/px/ob_dfo_mgr.cpp | 22 +- src/sql/engine/px/ob_dfo_mgr.h | 2 + src/sql/engine/px/ob_px_coord_op.cpp | 24 +- src/sql/engine/px/ob_px_coord_op.h | 3 +- src/sql/engine/table/ob_table_scan_op.cpp | 3 +- src/sql/engine/table/ob_table_scan_op.h | 9 + src/sql/ob_sql.cpp | 1 + src/sql/ob_sql_utils.cpp | 1 + src/sql/ob_sql_utils.h | 1 + src/sql/optimizer/ob_log_plan.cpp | 2 + src/sql/optimizer/ob_log_table_scan.h | 6 + src/sql/optimizer/ob_opt_est_cost_model.cpp | 158 +++++++ src/sql/optimizer/ob_opt_est_cost_model.h | 52 +++ src/sql/optimizer/ob_optimizer.cpp | 4 +- src/sql/plan_cache/ob_adaptive_auto_dop.cpp | 437 ++++++++++++++++++ src/sql/plan_cache/ob_adaptive_auto_dop.h | 80 ++++ src/sql/plan_cache/ob_plan_cache_struct.h | 7 +- src/sql/plan_cache/ob_plan_cache_util.h | 10 +- src/sql/plan_cache/ob_plan_set.cpp | 73 ++- src/sql/plan_cache/ob_plan_set.h | 6 +- .../all_virtual_sys_parameter_stat.result | 1 + 31 files changed, 900 insertions(+), 29 deletions(-) create mode 100644 src/sql/plan_cache/ob_adaptive_auto_dop.cpp create mode 100644 src/sql/plan_cache/ob_adaptive_auto_dop.h diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index fee8e2250a..6b3a8b9ab1 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -2261,3 +2261,6 @@ ERRSIM_DEF_INT(errsim_rebuild_ls_id, OB_CLUSTER_PARAMETER, "0", "[0,)", ERRSIM_DEF_STR(errsim_rebuild_addr, OB_CLUSTER_PARAMETER, "", "rebuild addr (ip:port)", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_BOOL(_enable_adaptive_auto_dop, OB_CLUSTER_PARAMETER, "False", + "Enable or disable adaptive auto dop feature.", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/sql/CMakeLists.txt b/src/sql/CMakeLists.txt index 3bb4a2eb96..74c95f4179 100644 --- a/src/sql/CMakeLists.txt +++ b/src/sql/CMakeLists.txt @@ -1122,6 +1122,7 @@ ob_set_subtarget(ob_sql plan_cache plan_cache/ob_lib_cache_node_factory.cpp plan_cache/ob_plan_match_helper.cpp plan_cache/ob_values_table_compression.cpp + plan_cache/ob_adaptive_auto_dop.cpp ) ob_set_subtarget(ob_sql printer diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 2722aa0d7e..177e3acbb3 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -225,6 +225,7 @@ int ObStaticEngineCG::generate(const ObLogPlan &log_plan, ObPhysicalPlan &phy_pl LOG_WARN("generated root spec is NULL", K(ret)); } else { phy_plan.set_root_op_spec(root_spec); + phy_plan.set_is_use_auto_dop(opt_ctx_->is_use_auto_dop()); if (OB_FAIL(set_other_properties(log_plan, phy_plan))) { LOG_WARN("set other properties failed", K(ret)); } @@ -5154,6 +5155,7 @@ int ObStaticEngineCG::generate_normal_tsc(ObLogTableScan &op, ObTableScanSpec &s } } OZ(generate_tsc_flags(op, spec)); + OX(spec.set_est_cost_simple_info(op.get_est_cost_simple_info())); bool is_equal_and = true; ObKeyPart* root = spec.tsc_ctdef_.pre_query_range_.get_table_grapth().key_part_head_; diff --git a/src/sql/engine/ob_exec_context.cpp b/src/sql/engine/ob_exec_context.cpp index f01fa49940..a3dfd12e38 100644 --- a/src/sql/engine/ob_exec_context.cpp +++ b/src/sql/engine/ob_exec_context.cpp @@ -139,7 +139,9 @@ ObExecContext::ObExecContext(ObIAllocator &allocator) table_level_slice_idx_(0), slice_row_idx_(0), autoinc_range_interval_(0), - lob_access_ctx_(nullptr) + lob_access_ctx_(nullptr), + auto_dop_map_(), + force_local_plan_(false) { } @@ -212,6 +214,7 @@ ObExecContext::~ObExecContext() lob_access_ctx_->~ObLobAccessCtx(); lob_access_ctx_ = nullptr; } + auto_dop_map_.destroy(); } void ObExecContext::clean_resolve_ctx() diff --git a/src/sql/engine/ob_exec_context.h b/src/sql/engine/ob_exec_context.h index 227a6cdab0..934aad2057 100644 --- a/src/sql/engine/ob_exec_context.h +++ b/src/sql/engine/ob_exec_context.h @@ -30,6 +30,7 @@ #include "pl/ob_pl_package_guard.h" #include "lib/udt/ob_udt_type.h" #include "lib/udt/ob_collection_type.h" +#include "sql/plan_cache/ob_adaptive_auto_dop.h" #define GET_PHY_PLAN_CTX(ctx) ((ctx).get_physical_plan_ctx()) #define GET_MY_SESSION(ctx) ((ctx).get_my_session()) @@ -548,6 +549,9 @@ public: int64_t get_autoinc_range_interval() { return autoinc_range_interval_; } int get_lob_access_ctx(ObLobAccessCtx *&lob_access_ctx); + AutoDopHashMap& get_auto_dop_map() { return auto_dop_map_; } + void set_force_gen_local_plan() { force_local_plan_ = true; } + bool is_force_gen_local_plan() const { return force_local_plan_; } private: int build_temp_expr_ctx(const ObTempExpr &temp_expr, ObTempExprCtx *&temp_expr_ctx); @@ -740,6 +744,8 @@ protected: //--------------- ObLobAccessCtx *lob_access_ctx_; + AutoDopHashMap auto_dop_map_; + bool force_local_plan_; private: DISALLOW_COPY_AND_ASSIGN(ObExecContext); }; diff --git a/src/sql/engine/ob_physical_plan.h b/src/sql/engine/ob_physical_plan.h index 1ac716a2bc..a9242f6930 100644 --- a/src/sql/engine/ob_physical_plan.h +++ b/src/sql/engine/ob_physical_plan.h @@ -551,6 +551,8 @@ public: direct_load_need_sort_ = direct_load_need_sort; } bool get_direct_load_need_sort() const { return direct_load_need_sort_; } + void set_is_use_auto_dop(bool use_auto_dop) { stat_.is_use_auto_dop_ = use_auto_dop; } + bool get_is_use_auto_dop() const { return stat_.is_use_auto_dop_; } public: static const int64_t MAX_PRINTABLE_SIZE = 2 * 1024 * 1024; private: diff --git a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp index 8ccad9e393..a8232d59ca 100644 --- a/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_fifo_coord_op.cpp @@ -70,7 +70,7 @@ int ObPxFifoCoordOp::inner_open() // use parallel scheduler for ddl to avoid large memory usage because // serial scheduler will hold the memory of intermediate result rows msg_proc_.set_scheduler(¶llel_scheduler_); - } else if (1 == px_dop_) { + } else if (use_serial_scheduler_) { msg_proc_.set_scheduler(&serial_scheduler_); } else { msg_proc_.set_scheduler(¶llel_scheduler_); diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp index ac77dd2d3b..a5837567fc 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp @@ -136,7 +136,7 @@ int ObPxMSCoordOp::inner_open() } else if (OB_FAIL(setup_loop_proc())) { LOG_WARN("fail setup loop proc", K(ret)); } else { - if (1 == px_dop_) { + if (use_serial_scheduler_) { msg_proc_.set_scheduler(&serial_scheduler_); } else { msg_proc_.set_scheduler(¶llel_scheduler_); diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_vec_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_vec_op.cpp index def0219df4..116cc5ea90 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_vec_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_vec_op.cpp @@ -141,7 +141,7 @@ int ObPxMSCoordVecOp::inner_open() ret = OB_ERR_UNEXPECTED; LOG_WARN("get invalid state", K(ret)); } else { - if (1 == px_dop_) { + if (use_serial_scheduler_) { msg_proc_.set_scheduler(&serial_scheduler_); } else { msg_proc_.set_scheduler(¶llel_scheduler_); diff --git a/src/sql/engine/px/exchange/ob_px_ordered_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ordered_coord_op.cpp index 033a63d3f0..899640cf1b 100644 --- a/src/sql/engine/px/exchange/ob_px_ordered_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ordered_coord_op.cpp @@ -86,7 +86,7 @@ int ObPxOrderedCoordOp::inner_open() } else if (OB_FAIL(setup_loop_proc())) { LOG_WARN("fail setup loop proc", K(ret)); } else { - if (1 == px_dop_) { + if (use_serial_scheduler_) { msg_proc_.set_scheduler(&serial_scheduler_); } else { msg_proc_.set_scheduler(¶llel_scheduler_); diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index 39c12c7b4b..6625370536 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -299,7 +299,9 @@ int ObDfoWorkerAssignment::assign_worker(ObDfoMgr &dfo_mgr, // compatible with version before 4.2 compatible_before_420 = true; scale_rate = static_cast(admited_worker_count) / static_cast(expected_worker_count); - } else if (0 >= admited_worker_count || minimal_worker_count == admited_worker_count) { + } else if (0 >= admited_worker_count) { + scale_rate = 1.0; + } else if (minimal_worker_count == admited_worker_count) { scale_rate = 0.0; } else if (OB_UNLIKELY(minimal_worker_count > admited_worker_count || minimal_worker_count >= expected_worker_count)) { @@ -665,7 +667,7 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, // 修改成 is_local = true, dop = 1 dfo->set_coord_info_ptr(&px_coord_info); dfo->set_single(transmit->is_px_single()); - dfo->set_dop(transmit->get_px_dop()); + dfo->set_dop(get_adaptive_px_dop(*transmit, exec_ctx)); dfo->set_qc_id(transmit->get_px_id()); dfo->set_dfo_id(transmit->get_dfo_id()); dfo->set_execution_id(exec_ctx.get_my_session()->get_current_execution_id()); @@ -1019,3 +1021,19 @@ int ObDfoMgr::get_running_dfos(ObIArray &dfos) const } return ret; } + +int64_t ObDfoMgr::get_adaptive_px_dop(const ObTransmitSpec &spec, ObExecContext &exec_ctx) const +{ + int ret = OB_SUCCESS; + int64_t px_dop = spec.get_px_dop(); + AutoDopHashMap &auto_dop_map = exec_ctx.get_auto_dop_map(); + if (!auto_dop_map.created()) { + // do nothing + } else if (OB_FAIL(auto_dop_map.get_refactored(spec.get_id(), px_dop))) { + LOG_WARN("failed to get refactored", K(ret)); + } else { + px_dop = px_dop >= 1 ? px_dop : spec.get_px_dop(); + } + LOG_TRACE("adaptive px dop", K(spec.get_id()), K(px_dop)); + return px_dop; +} diff --git a/src/sql/engine/px/ob_dfo_mgr.h b/src/sql/engine/px/ob_dfo_mgr.h index b835a38a4d..b5ee837544 100644 --- a/src/sql/engine/px/ob_dfo_mgr.h +++ b/src/sql/engine/px/ob_dfo_mgr.h @@ -21,6 +21,7 @@ namespace oceanbase namespace sql { +class ObTransmitSpec; class ObPxCoordInfo; class ObDfoMgr { @@ -64,6 +65,7 @@ private: int create_dfo(common::ObIAllocator &allocator, const ObOpSpec *dfo_root_op, ObDfo *&dfo) const; + int64_t get_adaptive_px_dop(const ObTransmitSpec &spec, ObExecContext &exec_ctx) const; protected: common::ObIAllocator &allocator_; bool inited_; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index b86faf4b6d..e0f32a40bf 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -144,12 +144,12 @@ ObPxCoordOp::ObPxCoordOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInpu interrupt_id_(0), register_detectable_id_(false), detectable_id_(), - px_dop_(1), time_recorder_(0), batch_rescan_param_version_(0), server_alive_checker_(coord_info_.dfo_mgr_, exec_ctx.get_my_session()->get_process_query_time()), last_px_batch_rescan_size_(0), - query_sql_() + query_sql_(), + use_serial_scheduler_(false) {} @@ -368,13 +368,31 @@ int ObPxCoordOp::setup_loop_proc() return OB_ERR_UNEXPECTED; } +int64_t ObPxCoordOp::get_adaptive_px_dop(int64_t dop) const +{ + int ret = OB_SUCCESS; + int64_t px_dop = dop; + AutoDopHashMap &auto_dop_map = ctx_.get_auto_dop_map(); + if (!auto_dop_map.created()) { + // do nothing + } else if (OB_FAIL(auto_dop_map.get_refactored(get_spec().get_id(), px_dop))) { + LOG_WARN("failed to get refactored", K(ret)); + } else { + px_dop = px_dop >= 1 ? px_dop : dop; + } + LOG_TRACE("adaptive px dop", K(get_spec().get_id()), K(px_dop)); + return px_dop; +} + int ObPxCoordOp::post_init_op_ctx() { int ret = OB_SUCCESS; if (OB_FAIL(init_obrpc_proxy(coord_info_.rpc_proxy_))) { LOG_WARN("fail init rpc proxy", K(ret)); } else { - px_dop_ = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan()->get_px_dop(); + int64_t plan_dop = GET_PHY_PLAN_CTX(ctx_)->get_phy_plan()->get_px_dop(); + int64_t adaptive_dop = get_adaptive_px_dop(plan_dop); + use_serial_scheduler_ = ((1 == plan_dop && 1 == adaptive_dop) ? true : false); set_pruning_table_locations(&(static_cast(get_spec()).table_locations_)); } diff --git a/src/sql/engine/px/ob_px_coord_op.h b/src/sql/engine/px/ob_px_coord_op.h index 995c6bfc7b..5ba88d2f9e 100644 --- a/src/sql/engine/px/ob_px_coord_op.h +++ b/src/sql/engine/px/ob_px_coord_op.h @@ -130,6 +130,7 @@ protected: // send rpc to clean dtl interm result of not scheduled dfos. virtual void clean_dfos_dtl_interm_result() = 0; int try_clear_p2p_dh_info(); + int64_t get_adaptive_px_dop(int64_t dop) const; protected: common::ObArenaAllocator allocator_; common::ObArenaAllocator row_allocator_; @@ -151,12 +152,12 @@ protected: ObInterruptibleTaskID interrupt_id_; bool register_detectable_id_; ObDetectableId detectable_id_; - int px_dop_; int64_t time_recorder_; int64_t batch_rescan_param_version_; ObExtraServerAliveCheck server_alive_checker_; int64_t last_px_batch_rescan_size_; ObString query_sql_; + bool use_serial_scheduler_; }; class ObPxCoordSpec : public ObPxReceiveSpec diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index e108db22c4..fd63791468 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -546,7 +546,8 @@ ObTableScanSpec::ObTableScanSpec(ObIAllocator &alloc, const ObPhyOperatorType ty flags_(0), tenant_id_col_idx_(0), partition_id_calc_type_(0), - parser_name_() + parser_name_(), + est_cost_simple_info_() { } diff --git a/src/sql/engine/table/ob_table_scan_op.h b/src/sql/engine/table/ob_table_scan_op.h index 9dbf65fe4a..f47cafc7bc 100644 --- a/src/sql/engine/table/ob_table_scan_op.h +++ b/src/sql/engine/table/ob_table_scan_op.h @@ -357,6 +357,14 @@ public: inline bool is_spatial_ddl() const { return is_spatial_ddl_; } inline void set_multivalue_ddl(bool is_multivalue_ddl) { is_multivalue_ddl_ = is_multivalue_ddl; } inline bool is_multivalue_ddl() const { return is_multivalue_ddl_; } + void set_est_cost_simple_info(const ObCostTableScanSimpleInfo &info) + { + est_cost_simple_info_ = info; + } + ObCostTableScanSimpleInfo& get_est_cost_simple_info() { return est_cost_simple_info_; } + const ObCostTableScanSimpleInfo& get_est_cost_simple_info() const { return est_cost_simple_info_; } + ObQueryFlag get_query_flag() const { return tsc_ctdef_.scan_flags_; } + DECLARE_VIRTUAL_TO_STRING; public: @@ -453,6 +461,7 @@ public: int64_t partition_id_calc_type_; common::ObString parser_name_; // word segment for ddl. + ObCostTableScanSimpleInfo est_cost_simple_info_; }; class ObTableScanOp : public ObOperator diff --git a/src/sql/ob_sql.cpp b/src/sql/ob_sql.cpp index 2e02f61936..730a87d69f 100644 --- a/src/sql/ob_sql.cpp +++ b/src/sql/ob_sql.cpp @@ -4680,6 +4680,7 @@ int ObSql::pc_add_plan(ObPlanCacheCtx &pc_ctx, phy_plan->stat_.is_rewrite_sql_ = pc_ctx.is_rewrite_sql_; phy_plan->stat_.rule_version_ = rule_mgr->get_rule_version(); phy_plan->stat_.enable_udr_ = enable_udr; + phy_plan->stat_.is_inner_ = result.get_session().is_inner(); if (PC_PS_MODE == pc_ctx.mode_ || PC_PL_MODE == pc_ctx.mode_) { // pc_key_ may be modified elsewhere, so reset it before adding plan diff --git a/src/sql/ob_sql_utils.cpp b/src/sql/ob_sql_utils.cpp index f6831fd70b..f249582e80 100644 --- a/src/sql/ob_sql_utils.cpp +++ b/src/sql/ob_sql_utils.cpp @@ -12,6 +12,7 @@ #define USING_LOG_PREFIX SQL_OPT #include "sql/ob_sql_utils.h" +#include #include "sql/ob_sql.h" #include #include diff --git a/src/sql/ob_sql_utils.h b/src/sql/ob_sql_utils.h index d841597b15..aaf38d268c 100644 --- a/src/sql/ob_sql_utils.h +++ b/src/sql/ob_sql_utils.h @@ -23,6 +23,7 @@ #include "share/ob_i_sql_expression.h" // ObISqlExpression,ObExprCtx #include "share/schema/ob_table_param.h" // ObColDesc #include "share/schema/ob_multi_version_schema_service.h" // ObMultiVersionSchemaService +#include "share/ob_simple_batch.h" #include "sql/ob_phy_table_location.h" #include "sql/ob_sql_define.h" #include "sql/parser/parse_node.h" diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 058367b2da..322cef1112 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -2843,6 +2843,8 @@ int ObLogPlan::allocate_access_path(AccessPath *ap, LOG_ERROR("failed to allocate table/index operator", K(ret)); } else if (OB_FAIL(scan->set_est_row_count_record(ap->est_records_))) { LOG_WARN("failed to set estimation info", K(ret)); + } else if (OB_FAIL(scan->init_est_cost_simple_info(ap->get_cost_table_scan_info()))) { + LOG_WARN("failed to init est cost simple info", K(ret)); } else { scan->set_est_cost_info(&ap->get_cost_table_scan_info()); scan->set_flashback_query_expr(table_item->flashback_query_expr_); diff --git a/src/sql/optimizer/ob_log_table_scan.h b/src/sql/optimizer/ob_log_table_scan.h index 70cc49bec4..cfcd054c4e 100644 --- a/src/sql/optimizer/ob_log_table_scan.h +++ b/src/sql/optimizer/ob_log_table_scan.h @@ -393,6 +393,11 @@ public: ObCostTableScanInfo *get_est_cost_info() { return est_cost_info_; } + int init_est_cost_simple_info(const ObCostTableScanInfo &est_info) + { return est_cost_simple_info_.init(est_info); } + ObCostTableScanSimpleInfo& get_est_cost_simple_info() { return est_cost_simple_info_; } + const ObCostTableScanSimpleInfo& get_est_cost_simple_info() const { return est_cost_simple_info_; } + int set_update_info(); void set_part_ids(const common::ObIArray *part_ids) { part_ids_ = part_ids; } @@ -825,6 +830,7 @@ protected: // memeber variables // 记录该表是否采样、采样方式、比例等信息 SampleInfo sample_info_; ObCostTableScanInfo *est_cost_info_; + ObCostTableScanSimpleInfo est_cost_simple_info_; BaseTableOptInfo *table_opt_info_; common::ObSEArray est_records_; diff --git a/src/sql/optimizer/ob_opt_est_cost_model.cpp b/src/sql/optimizer/ob_opt_est_cost_model.cpp index 04460ebf17..60b08ec098 100644 --- a/src/sql/optimizer/ob_opt_est_cost_model.cpp +++ b/src/sql/optimizer/ob_opt_est_cost_model.cpp @@ -2445,3 +2445,161 @@ int ObOptEstCostModel::calc_pred_cost_per_row(const ObRawExpr *expr, } return ret; } + + +OB_SERIALIZE_MEMBER(ObCostTableScanSimpleInfo, + is_index_back_, + is_global_index_, + part_count_, + table_micro_blocks_, + index_micro_blocks_, + range_count_, + table_row_count_, + postix_filter_qual_cost_per_row_, + table_filter_qual_cost_per_row_, + index_scan_project_cost_per_row_, + index_back_project_cost_per_row_, + row_width_, + is_spatial_index_, + index_id_); + +int ObCostTableScanSimpleInfo::init(const ObCostTableScanInfo &est_cost_info) +{ + int ret = OB_SUCCESS; + const ObTableMetaInfo *table_meta_info = est_cost_info.table_meta_info_; + const ObIndexMetaInfo &index_meta_info = est_cost_info.index_meta_info_; + OptSystemStat default_stat; + default_stat.set_cpu_speed(DEFAULT_CPU_SPEED); + default_stat.set_disk_seq_read_speed(DEFAULT_DISK_SEQ_READ_SPEED); + default_stat.set_disk_rnd_read_speed(DEFAULT_DISK_RND_READ_SPEED); + default_stat.set_network_speed(DEFAULT_NETWORK_SPEED); + ObOptEstVectorCostModel vector_model(cost_params_vector, default_stat); + if (OB_ISNULL(table_meta_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", K(ret)); + } else if (OB_FAIL(vector_model.cost_project(1, + est_cost_info.access_column_items_, + false, + false, + index_scan_project_cost_per_row_))) { + LOG_WARN("failed to cost project", K(ret)); + } else if (OB_FAIL(vector_model.cost_project(1, + est_cost_info.index_access_column_items_, + true, + false, + index_back_project_cost_per_row_))) { + LOG_WARN("failed to cost project", K(ret)); + } else { + is_index_back_ = est_cost_info.index_meta_info_.is_index_back_; + is_global_index_ = est_cost_info.index_meta_info_.is_global_index_; + part_count_ = est_cost_info.index_meta_info_.index_part_count_; + part_count_ = part_count_ < 1 ? 1 : part_count_; + index_micro_blocks_ = index_meta_info.get_micro_block_numbers() / part_count_; + index_micro_blocks_ = index_micro_blocks_ < 1 ? 1 : index_micro_blocks_; + range_count_ = est_cost_info.ranges_.count(); + is_spatial_index_ = est_cost_info.index_meta_info_.is_geo_index_; + table_micro_blocks_ = table_meta_info->get_micro_block_numbers() / part_count_; + table_micro_blocks_ = table_micro_blocks_ < 1 ? 1 : table_micro_blocks_; + table_row_count_ = static_cast(table_meta_info->table_row_count_) / part_count_; + postix_filter_qual_cost_per_row_ = vector_model.cost_quals(1, est_cost_info.postfix_filters_, false); + table_filter_qual_cost_per_row_ = vector_model.cost_quals(1, est_cost_info.table_filters_, false); + row_width_ = table_meta_info->average_row_size_; + index_id_ = est_cost_info.index_id_; + } + return ret; +} + +int ObCostTableScanSimpleInfo::calculate_table_dop(double range_row_count, + double index_back_row_count, + int64_t part_cnt, + int64_t cost_threshold_us, + int64_t parallel_degree_limit, + int64_t &table_dop) const +{ + int ret = OB_SUCCESS; + table_dop = ObGlobalHint::UNSET_PARALLEL; + double pre_cost = -1.0; + double cost = 0.0; + double px_cost = 0.0; + int64_t pre_parallel = ObGlobalHint::UNSET_PARALLEL; + int64_t cur_parallel = ObGlobalHint::DEFAULT_PARALLEL; + if (table_row_count_ <= 0 || part_count_ < 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpect param", K(ret)); + } + OPT_TRACE_TITLE("calculate table dop"); + OPT_TRACE(KV_(is_index_back), KV_(is_global_index), KV_(part_count), + KV_(table_micro_blocks), KV_(index_micro_blocks), KV_(range_count), + KV_(table_row_count), KV_(postix_filter_qual_cost_per_row), + KV_(table_filter_qual_cost_per_row), KV_(index_scan_project_cost_per_row), + KV_(index_back_project_cost_per_row), KV_(row_width)); + while (OB_SUCC(ret) && ObGlobalHint::UNSET_PARALLEL == table_dop) { + cost = calculate_table_scan_cost(range_row_count, index_back_row_count, part_cnt, cur_parallel); + px_cost = cur_parallel > 1 ? 0.1 * cur_parallel * cur_parallel : 0.0; + OPT_TRACE("dop:", cur_parallel, ", cost:", cost, ", px_cost:", px_cost); + if (cur_parallel >= parallel_degree_limit) { + table_dop = cur_parallel; + } else if (pre_cost > 0 && pre_cost <= cost) { + table_dop = pre_parallel; + } else if (cost - px_cost <= cost_threshold_us || px_cost >= cost - px_cost) { + table_dop = cur_parallel; + } else { + pre_cost = cost; + pre_parallel = cur_parallel; + ++cur_parallel; + } + } + OPT_TRACE("table dop:", table_dop, ", parallel_degree_limit:", parallel_degree_limit, + ", cost_threshold_us:", cost_threshold_us, ", range_row_count:", range_row_count, + ", part_cnt:", part_cnt); + return ret; +} + +double ObCostTableScanSimpleInfo::calculate_table_scan_cost(double range_row_count, + double index_back_row_count, + int64_t part_cnt, + int64_t parallel) const +{ + double cost = 0.0; + double part_count_per_dop = part_cnt * 1.0/parallel; + OptSystemStat default_stat; + default_stat.set_cpu_speed(DEFAULT_CPU_SPEED); + default_stat.set_disk_seq_read_speed(DEFAULT_DISK_SEQ_READ_SPEED); + default_stat.set_disk_rnd_read_speed(DEFAULT_DISK_RND_READ_SPEED); + default_stat.set_network_speed(DEFAULT_NETWORK_SPEED); + range_row_count /= part_cnt; + index_back_row_count /= part_cnt; + //calc index scan cost + { + int64_t num_micro_blocks_read = std::ceil(index_micro_blocks_ * range_row_count / table_row_count_); + double first_block_cost = cost_params_vector.get_micro_block_rnd_cost(default_stat); + double io_cost = first_block_cost + cost_params_vector.get_micro_block_seq_cost(default_stat) * (num_micro_blocks_read-1); + double range_cost = range_count_ * cost_params_vector.get_range_cost(default_stat); + double qual_cost = postix_filter_qual_cost_per_row_ * range_row_count; + if (!is_index_back_) { + qual_cost += table_filter_qual_cost_per_row_ * range_row_count; + } + double project_cost = index_scan_project_cost_per_row_ * range_row_count; + double cpu_cost = range_cost + qual_cost + project_cost; + cost = io_cost > cpu_cost ? io_cost : cpu_cost; + } + if (is_index_back_) { + //calc index back cost + int64_t num_micro_blocks_read = table_micro_blocks_; + if (OB_LIKELY(table_row_count_ > 0 && index_back_row_count <= table_row_count_)) { + num_micro_blocks_read = table_micro_blocks_ * (1.0 - std::pow((1.0 - index_back_row_count / table_row_count_), table_row_count_ / table_micro_blocks_)); + num_micro_blocks_read = std::ceil(num_micro_blocks_read); + } + double io_cost = cost_params_vector.get_micro_block_rnd_cost(default_stat) * num_micro_blocks_read; + double qual_cost = table_filter_qual_cost_per_row_ * range_row_count; + double project_cost = index_back_project_cost_per_row_ * range_row_count; + double cpu_cost = qual_cost + project_cost; + cost += io_cost + cpu_cost; + if(is_global_index_) { + double network_cost = row_width_ * index_back_row_count * cost_params_vector.get_network_trans_per_byte_cost(default_stat) + + index_back_row_count * cost_params_vector.get_table_loopup_per_row_rpc_cost(default_stat); + } + } + cost *= part_count_per_dop; + return cost; +} \ No newline at end of file diff --git a/src/sql/optimizer/ob_opt_est_cost_model.h b/src/sql/optimizer/ob_opt_est_cost_model.h index 45a400329d..1796af3a46 100644 --- a/src/sql/optimizer/ob_opt_est_cost_model.h +++ b/src/sql/optimizer/ob_opt_est_cost_model.h @@ -935,6 +935,58 @@ protected: DISALLOW_COPY_AND_ASSIGN(ObOptEstCostModel); }; +class ObCostTableScanSimpleInfo { + OB_UNIS_VERSION_V(1); +public: + ObCostTableScanSimpleInfo() + :is_index_back_(false), + is_global_index_(false), + part_count_(1), + table_micro_blocks_(0), + index_micro_blocks_(0), + range_count_(1), + table_row_count_(1), + postix_filter_qual_cost_per_row_(0), + table_filter_qual_cost_per_row_(0), + index_scan_project_cost_per_row_(0), + index_back_project_cost_per_row_(0), + row_width_(0), + is_spatial_index_(false), + index_id_(0) + { } + ~ObCostTableScanSimpleInfo()=default; + int init(const ObCostTableScanInfo &est_cost_info); + int calculate_table_dop(double range_row_count, + double index_back_row_count, + int64_t part_cnt, + int64_t cost_threshold_us, + int64_t parallel_degree_limit, + int64_t &table_dop) const; + int64_t get_range_columns_count() const { return range_count_; } + bool get_is_spatial_index() const { return is_spatial_index_; } + int64_t get_index_id() const { return index_id_; } + +private: + double calculate_table_scan_cost(double range_row_count, + double index_back_row_count, + int64_t part_cnt, + int64_t parallel) const; + bool is_index_back_; + bool is_global_index_; + int64_t part_count_; + int64_t table_micro_blocks_; + int64_t index_micro_blocks_; + int64_t range_count_; + double table_row_count_; + double postix_filter_qual_cost_per_row_; + double table_filter_qual_cost_per_row_; + double index_scan_project_cost_per_row_; + double index_back_project_cost_per_row_; + double row_width_; + bool is_spatial_index_; + int64_t index_id_; +}; + } } diff --git a/src/sql/optimizer/ob_optimizer.cpp b/src/sql/optimizer/ob_optimizer.cpp index 6443adbf30..dd7992750f 100644 --- a/src/sql/optimizer/ob_optimizer.cpp +++ b/src/sql/optimizer/ob_optimizer.cpp @@ -699,9 +699,11 @@ int ObOptimizer::init_parallel_policy(ObDMLStmt &stmt, const ObSQLSessionInfo &s int64_t session_force_parallel_dop = ObGlobalHint::UNSET_PARALLEL; bool session_enable_auto_dop = false; bool session_enable_manual_dop = false; - if (OB_ISNULL(ctx_.get_query_ctx())) { + if (OB_ISNULL(ctx_.get_query_ctx()) || OB_ISNULL(ctx_.get_exec_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("query ctx is nul", K(ret)); + } else if (ctx_.get_exec_ctx()->is_force_gen_local_plan()) { + ctx_.set_parallel_rule(PXParallelRule::PL_UDF_DAS_FORCE_SERIALIZE); } else if (ctx_.has_pl_udf()) { //following above rule, but if stmt contain pl_udf, force das, parallel should be 1 ctx_.set_parallel_rule(PXParallelRule::PL_UDF_DAS_FORCE_SERIALIZE); diff --git a/src/sql/plan_cache/ob_adaptive_auto_dop.cpp b/src/sql/plan_cache/ob_adaptive_auto_dop.cpp new file mode 100644 index 0000000000..99f5097031 --- /dev/null +++ b/src/sql/plan_cache/ob_adaptive_auto_dop.cpp @@ -0,0 +1,437 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SQL_PC +#include "sql/plan_cache/ob_adaptive_auto_dop.h" +#include "sql/engine/table/ob_table_scan_op.h" +#include "sql/optimizer/ob_access_path_estimation.h" +#include "sql/optimizer/ob_storage_estimator.h" +#include "sql/optimizer/ob_optimizer_util.h" +#include "share/ob_rpc_struct.h" + +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace sql +{ +int ObAdaptiveAutoDop::calculate_table_auto_dop(const ObPhysicalPlan &plan, AutoDopHashMap &map, + bool &is_single_part) +{ + int ret = OB_SUCCESS; + is_single_part = false; + int64_t table_dop = -1; + ObMemAttr attr(MTL_ID(), "AutoDopMap"); + const ObOpSpec *root_spec = plan.get_root_op_spec(); + if (OB_ISNULL(root_spec)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("root spec is null", K(ret)); + } else if (map.created()) { + map.clear(); + } else if (OB_FAIL(map.create(common::hash::cal_next_prime(256), attr, attr))) { + LOG_WARN("create hash map failed", K(ret)); + } + OZ(inner_calculate_table_auto_dop(*root_spec, map, table_dop, is_single_part)); + return ret; +} + +int ObAdaptiveAutoDop::inner_calculate_table_auto_dop(const ObOpSpec &spec, AutoDopHashMap &map, + int64_t &table_dop, bool &is_single_part) +{ + int ret = OB_SUCCESS; + table_dop = -1; + if (spec.is_table_scan()) { + if (OB_FAIL(calculate_tsc_auto_dop(spec, table_dop, is_single_part))) { + LOG_WARN("failed to calculate tsc auto dop", K(ret)); + } + } else { + int64_t child_cnt = spec.get_child_cnt(); + for (int64_t i = 0; i < child_cnt; i++) { + int64_t dop = -1; + const ObOpSpec *child_spec = spec.get_child(i); + if (OB_ISNULL(child_spec)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("spec is null", K(ret)); + } else if (OB_FAIL(SMART_CALL( + inner_calculate_table_auto_dop(*child_spec, map, dop, is_single_part)))) { + LOG_WARN("failed to recalulate table dop", K(ret)); + } else { + // currently only supports single table, non-table_scan operator, dop directly + // inherits from the child + table_dop = table_dop > dop ? table_dop : dop; + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(map.set_refactored(spec.get_id(), table_dop))) { + LOG_WARN("set refactored failed", K(ret), K(spec.get_id()), K(table_dop)); + } + return ret; +} + +int ObAdaptiveAutoDop::calculate_tsc_auto_dop(const ObOpSpec &spec, int64_t &table_dop, + bool &is_single_part) +{ + int ret = OB_SUCCESS; + table_dop = -1; + int64_t part_cnt = 0; + bool res_reliable = true; + ObArray tasks; + common::ObIAllocator &allocator = ctx_.get_allocator(); + const ObTableScanSpec &tsc_spec = static_cast(spec); + const ObCostTableScanSimpleInfo &cost_tsc_info = tsc_spec.get_est_cost_simple_info(); + bool is_oracle_agent_table = + share::is_oracle_mapping_real_virtual_table(tsc_spec.get_ref_table_id()); + ObQueryRangeArray key_ranges; + const ObQueryRange &query_range = tsc_spec.get_query_range(); + + if (is_oracle_agent_table || cost_tsc_info.get_is_spatial_index()) { + // step1: calculate query range. + } else if (query_range.has_range() + && OB_FAIL(ObSQLUtils::extract_pre_query_range( + query_range, ctx_.get_allocator(), ctx_, key_ranges, + ObBasicSessionInfo::create_dtc_params(ctx_.get_my_session())))) { + LOG_WARN("failed to extract pre query ranges", K(ret)); + // step2: build est tasks. + } else if (OB_FAIL(build_storage_estimation_tasks(tsc_spec, cost_tsc_info, key_ranges, tasks, + is_single_part, part_cnt))) { + LOG_WARN("failed to build storage estimation tasks", K(ret)); + // step3: do storeage estimation. + } else if (OB_FAIL(do_storage_estimation(tasks, res_reliable))) { + LOG_WARN("failed to do storage estimation", K(ret)); + // step4: calculate tsc auto dop + } else if (res_reliable && tasks.count() > 0 + && OB_FAIL(calculate_tsc_auto_dop(tasks, cost_tsc_info, part_cnt, table_dop))) { + LOG_WARN("failed to calculate table dop", K(ret)); + } + return ret; +} + +int ObAdaptiveAutoDop::choose_storage_estimation_partitions(const int64_t partition_limit, + const DASTabletLocSEArray &tablet_locs, + DASTabletLocSEArray &chosen_tablet_locs) +{ + int ret = OB_SUCCESS; + ObSqlBitSet<> min_max_index; + int64_t min_index = 0; + int64_t max_index = 0; + const int STORAGE_EST_SAMPLE_SEED = 1; + if (partition_limit <= 0 || partition_limit >= tablet_locs.count()) { + if (OB_FAIL(chosen_tablet_locs.assign(tablet_locs))) { + LOG_WARN("failed to assign", K(ret)); + } + } else { + for (int64_t i = 1; i < tablet_locs.count(); i ++) { + if (tablet_locs.at(i)->tablet_id_.id() < + tablet_locs.at(min_index)->tablet_id_.id()) { + min_index = i; + } + if (tablet_locs.at(i)->tablet_id_.id() > + tablet_locs.at(max_index)->tablet_id_.id()) { + max_index = i; + } + } + if (OB_FAIL(min_max_index.add_member(min_index)) || + OB_FAIL(min_max_index.add_member(max_index))) { + LOG_WARN("failed to add member", K(ret)); + } else if (OB_FAIL(ObOptimizerUtil::choose_random_members( + STORAGE_EST_SAMPLE_SEED, tablet_locs, partition_limit, + chosen_tablet_locs, &min_max_index))) { + LOG_WARN("failed to choose random partitions", K(ret), K(partition_limit)); + } + } + return ret; +} + +int ObAdaptiveAutoDop::build_storage_estimation_tasks( + const ObTableScanSpec &tsc_spec, const ObCostTableScanSimpleInfo &cost_tsc_info, + ObQueryRangeArray &ranges, ObIArray &tasks, bool &is_single_part, + int64_t &part_cnt) +{ + int ret = OB_SUCCESS; + ObDASTableLoc *match_table_loc = nullptr; + ObDASCtx &das_ctx = ctx_.get_das_ctx(); + const ObSimpleTableSchemaV2 *table_schema = nullptr; + ObSqlCtx *sql_ctx = ctx_.get_sql_ctx(); + common::ObIAllocator &allocator = ctx_.get_allocator(); + const DASTableLocList &table_locs = das_ctx.get_table_loc_list(); + const int64_t index_id = cost_tsc_info.get_index_id(); + const int64_t MAX_EST_TASK_NUM = 10; + + FOREACH(table_loc, table_locs) + { + if (index_id == (*table_loc)->get_ref_table_id()) { + match_table_loc = (*table_loc); + break; + } + } + if (OB_ISNULL(sql_ctx) || OB_ISNULL(match_table_loc)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("is null", K(ret), K(sql_ctx), K(match_table_loc)); + } else if (OB_FAIL( + sql_ctx->schema_guard_->get_simple_table_schema(MTL_ID(), index_id, table_schema))) { + LOG_WARN("failed to get simple table schema", K(ret), K(MTL_ID()), K(index_id)); + } else if (OB_ISNULL(table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null schema", K(ret)); + } else { + const DASTabletLocList &tablet_locs = match_table_loc->get_tablet_locs(); + is_single_part = (1 == tablet_locs.size()); + part_cnt = tablet_locs.size(); + int64_t schema_version = table_schema->get_schema_version(); + DASTabletLocSEArray candi_tablet_locs; + DASTabletLocSEArray chosen_tablet_locs; + if (OB_FAIL(candi_tablet_locs.reserve(tablet_locs.size()))) { + LOG_WARN("failed to reserve", K(tablet_locs.size())); + } else { + FOREACH_X(tablet_loc, tablet_locs, OB_SUCC(ret)) + { + if (OB_FAIL(candi_tablet_locs.push_back(*tablet_loc))) { + LOG_WARN("failed to push back", K(ret)); + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(choose_storage_estimation_partitions(MAX_EST_TASK_NUM, candi_tablet_locs, + chosen_tablet_locs))) { + LOG_WARN("failto to choose storage est partitions", K(ret)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < chosen_tablet_locs.count(); i++) { + if (OB_FAIL(add_estimation_tasks(tsc_spec, cost_tsc_info, schema_version, + chosen_tablet_locs.at(i), ranges, tasks))) { + LOG_WARN("failed to add estimation tasks", K(ret)); + } + } + } + } + return ret; +} + +int ObAdaptiveAutoDop::get_task(ObIArray &tasks, const ObAddr &addr, + ObBatchEstTasks *&task) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < tasks.count(); ++i) { + if (OB_ISNULL(tasks.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid batch estimation task", K(ret)); + } else if (tasks.at(i)->addr_ == addr) { + task = tasks.at(i); + } + } + return ret; +} + +int ObAdaptiveAutoDop::add_estimation_tasks(const ObTableScanSpec &tsc_spec, + const ObCostTableScanSimpleInfo &cost_tsc_info, + const int64_t schema_version, + ObDASTabletLoc *tablet_loc, ObQueryRangeArray &ranges, + ObIArray &tasks) +{ + int ret = OB_SUCCESS; + void *ptr = nullptr; + ObBatchEstTasks *task = nullptr; + ObSqlCtx *sql_ctx = ctx_.get_sql_ctx(); + common::ObIAllocator &allocator = ctx_.get_allocator(); + obrpc::ObEstPartArgElement *index_est_arg = NULL; + if (OB_FAIL(get_task(tasks, tablet_loc->server_, task))) { + LOG_WARN("failed to get task", K(ret)); + } else if (NULL != task) { + // do nothing + } else if (OB_ISNULL(ptr = allocator.alloc(sizeof(ObBatchEstTasks)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("memory is not enough", K(ret)); + } else if (FALSE_IT(task = new (ptr) ObBatchEstTasks())) { + } else if (OB_FAIL(tasks.push_back(task))) { + LOG_WARN("failed to add task", K(ret)); + } + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(index_est_arg = task->arg_.index_params_.alloc_place_holder())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate index argument", K(ret)); + } else { + task->addr_ = tablet_loc->server_; + task->arg_.schema_version_ = schema_version; + index_est_arg->index_id_ = cost_tsc_info.get_index_id(); + index_est_arg->scan_flag_ = tsc_spec.get_query_flag(); + index_est_arg->range_columns_count_ = cost_tsc_info.get_range_columns_count(); + index_est_arg->tablet_id_ = tablet_loc->tablet_id_; + index_est_arg->ls_id_ = tablet_loc->ls_id_; + index_est_arg->tenant_id_ = MTL_ID(); + index_est_arg->tx_id_ = sql_ctx->session_info_->get_tx_id(); + if (OB_FAIL(construct_scan_range_batch(allocator, ranges, index_est_arg->batch_))) { + LOG_WARN("failed to construct scan range batch", K(ret)); + } + } + return ret; +} + +int ObAdaptiveAutoDop::construct_scan_range_batch(ObIAllocator &allocator, + const ObQueryRangeArray &scan_ranges, + ObSimpleBatch &batch) +{ + int ret = OB_SUCCESS; + // FIXME, consider the lifetime of ObSimpleBatch, how to deconstruct the ObSEArray + if (scan_ranges.empty()) { + batch.type_ = ObSimpleBatch::T_NONE; + } else if (scan_ranges.count() == 1) { // T_SCAN + void *ptr = allocator.alloc(sizeof(SQLScanRange)); + if (OB_ISNULL(ptr)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", K(ptr), K(ret)); + } else { + SQLScanRange *range = new (ptr) SQLScanRange(); + *range = *(scan_ranges.at(0)); + batch.type_ = ObSimpleBatch::T_SCAN; + batch.range_ = range; + } + } else { // T_MULTI_SCAN + SQLScanRangeArray *range_array = NULL; + void *ptr = allocator.alloc(sizeof(SQLScanRangeArray)); + if (OB_ISNULL(ptr)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc memory", K(ptr), K(ret)); + } else { + range_array = new (ptr) SQLScanRangeArray(); + batch.type_ = ObSimpleBatch::T_MULTI_SCAN; + batch.ranges_ = range_array; + int64_t size = std::min(scan_ranges.count(), ObOptEstCost::MAX_STORAGE_RANGE_ESTIMATION_NUM); + for (int64_t i = 0; OB_SUCC(ret) && i < size; ++i) { + if (OB_FAIL(range_array->push_back(*(scan_ranges.at(i))))) { + LOG_WARN("failed to push back scan range", K(ret)); + } + } + } + } + return ret; +} + +int ObAdaptiveAutoDop::do_storage_estimation(ObIArray &tasks, bool &res_reliable) +{ + int ret = OB_SUCCESS; + res_reliable = true; + for (int64_t i = 0; OB_SUCC(ret) && res_reliable && i < tasks.count(); ++i) { + if (OB_ISNULL(tasks.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("task is null", K(ret)); + } else if (OB_FAIL(do_storage_estimation(*tasks.at(i)))) { + res_reliable = false; + LOG_WARN("failed to do storage estimation", K(ret)); + break; + } + } + return ret; +} + +int ObAdaptiveAutoDop::do_storage_estimation(ObBatchEstTasks &tasks) +{ + int ret = OB_SUCCESS; + const ObAddr &addr = tasks.addr_; + const obrpc::ObEstPartArg &arg = tasks.arg_; + obrpc::ObEstPartRes &result = tasks.res_; + ObSqlCtx *sql_ctx = ctx_.get_sql_ctx(); + if (addr == GCTX.self_addr()) { + if (OB_FAIL(ObStorageEstimator::estimate_row_count(arg, result))) { + LOG_WARN("failed to estimate partition rows", K(ret)); + } + } else { + obrpc::ObSrvRpcProxy *rpc_proxy = GCTX.srv_rpc_proxy_; + int64_t timeout = -1; + if (OB_ISNULL(sql_ctx->session_info_) || OB_ISNULL(rpc_proxy)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("rpc_proxy or session is null", K(ret), K(rpc_proxy), K(sql_ctx->session_info_)); + } else if (0 >= (timeout = THIS_WORKER.get_timeout_remain())) { + ret = OB_TIMEOUT; + LOG_WARN("query timeout is reached", K(ret), K(timeout)); + } else if (OB_FAIL(rpc_proxy->to(addr) + .timeout(timeout) + .by(sql_ctx->session_info_->get_rpc_tenant_id()) + .estimate_partition_rows(arg, result))) { + LOG_WARN("OPT:[REMOTE STORAGE EST FAILED]", K(ret)); + } + } + return ret; +} + +int ObAdaptiveAutoDop::calculate_tsc_auto_dop(const ObIArray &tasks, + const ObCostTableScanSimpleInfo &cost_tsc_info, + int64_t part_cnt, int64_t &table_dop) +{ + int ret = OB_SUCCESS; + table_dop = -1; + int64_t range_row_count = 0; + ObSqlCtx *sql_ctx = ctx_.get_sql_ctx(); + uint64_t parallel_degree_limit = 0; + uint64_t parallel_min_scan_time_threshold = 1000; + uint64_t parallel_servers_target = 0; + uint64_t unit_min_cpu = 0; + uint64_t cost_threshold_us = 0; + const int64_t MAX_EST_TASK_NUM = 10; + for (int64_t i = 0; i < tasks.count(); ++i) { + const ObBatchEstTasks *task = tasks.at(i); + for (int64_t j = 0; j < task->res_.index_param_res_.count(); ++j) { + range_row_count += task->res_.index_param_res_.at(j).logical_row_count_; + } + } + if (part_cnt > MAX_EST_TASK_NUM) { + range_row_count = range_row_count / MAX_EST_TASK_NUM * part_cnt; + } + if (OB_FAIL(sql_ctx->session_info_->get_sys_variable(share::SYS_VAR_PARALLEL_DEGREE_LIMIT, + parallel_degree_limit))) { + LOG_WARN("failed to get sys variable parallel degree limit", K(ret)); + } else if (OB_FAIL(sql_ctx->session_info_->get_sys_variable( + share::SYS_VAR_PARALLEL_MIN_SCAN_TIME_THRESHOLD, + parallel_min_scan_time_threshold))) { + LOG_WARN("failed to get sys variable parallel threshold", K(ret)); + } + if (OB_SUCC(ret) && 0 == parallel_degree_limit) { + const ObTenantBase *tenant = NULL; + int64_t parallel_servers_target = 0; + if (OB_ISNULL(tenant = MTL_CTX())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected null", K(ret)); + } else if (sql_ctx->session_info_->is_user_session() + && OB_FAIL(ObSchemaUtils::get_tenant_int_variable( + sql_ctx->session_info_->get_effective_tenant_id(), + SYS_VAR_PARALLEL_SERVERS_TARGET, parallel_servers_target))) { + LOG_WARN("fail read tenant variable", K(ret), + K(sql_ctx->session_info_->get_effective_tenant_id())); + } else { + unit_min_cpu = std::max(tenant->unit_min_cpu(), 0.0); + parallel_servers_target = std::max(parallel_servers_target, 0L); + } + } + if (OB_SUCC(ret)) { + int64_t server_cnt = 1; + if (0 < parallel_degree_limit) { + // do nothing + } else if (0 >= parallel_servers_target || 0 >= unit_min_cpu || 0 >= server_cnt) { + parallel_degree_limit = std::max(parallel_servers_target, server_cnt * unit_min_cpu); + } else { + parallel_degree_limit = std::min(parallel_servers_target, server_cnt * unit_min_cpu); + } + cost_threshold_us = 1000.0 * std::max(10UL, parallel_min_scan_time_threshold); + parallel_degree_limit = std::max(parallel_degree_limit, 1UL); + } + BEGIN_OPT_TRACE(sql_ctx->session_info_, ""); + if (OB_FAIL(ret)) { + } else if (OB_FAIL(cost_tsc_info.calculate_table_dop(range_row_count, range_row_count, part_cnt, + cost_threshold_us, parallel_degree_limit, + table_dop))) { + LOG_WARN("failed to calculate table dop", K(ret)); + } + END_OPT_TRACE(sql_ctx->session_info_); + return ret; +} + +} // namespace common +} // namespace oceanbase diff --git a/src/sql/plan_cache/ob_adaptive_auto_dop.h b/src/sql/plan_cache/ob_adaptive_auto_dop.h new file mode 100644 index 0000000000..9e2b540f33 --- /dev/null +++ b/src/sql/plan_cache/ob_adaptive_auto_dop.h @@ -0,0 +1,80 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_SQL_PLAN_CACHE_OB_ADAPTIVE_AUTO_DOP_ +#define OCEANBASE_SQL_PLAN_CACHE_OB_ADAPTIVE_AUTO_DOP_ + +#include "lib/container/ob_se_array.h" +#include "lib/hash/ob_hashmap.h" +#include "sql/engine/ob_physical_plan.h" + +namespace oceanbase +{ +namespace common +{ +class ObIAllocator; +} + +namespace sql +{ +class ObBatchEstTasks; +class ObCostTableScanSimpleInfo; +class ObDASTabletLoc; +typedef common::hash::ObHashMap AutoDopHashMap; + +class ObAdaptiveAutoDop +{ +public: + ObAdaptiveAutoDop(ObExecContext &exec_ctx) + : ctx_(exec_ctx) + { + + } + int calculate_table_auto_dop(const ObPhysicalPlan &plan, + AutoDopHashMap &map, + bool &is_single_part); + + VIRTUAL_TO_STRING_KV(K(&ctx_)); + +private: + int inner_calculate_table_auto_dop(const ObOpSpec &spec, AutoDopHashMap &map, int64_t &table_dop, + bool &is_single_part); + int calculate_tsc_auto_dop(const ObOpSpec &spec, int64_t &table_dop, bool &is_single_part); + int build_storage_estimation_tasks(const ObTableScanSpec &tsc_spec, + const ObCostTableScanSimpleInfo &cost_tsc_info, + ObQueryRangeArray &ranges, ObIArray &tasks, + bool &is_single_part, int64_t &part_cnt); + int add_estimation_tasks(const ObTableScanSpec &tsc_spec, + const ObCostTableScanSimpleInfo &cost_tsc_info, + const int64_t schema_version, ObDASTabletLoc *tablet_loc, + ObQueryRangeArray &ranges, ObIArray &tasks); + int construct_scan_range_batch(ObIAllocator &allocator, const ObQueryRangeArray &scan_ranges, + ObSimpleBatch &batch); + int do_storage_estimation(ObBatchEstTasks &tasks); + int do_storage_estimation(ObIArray &tasks, bool &res_reliable); + int calculate_tsc_auto_dop(const ObIArray &tasks, + const ObCostTableScanSimpleInfo &cost_tsc_info, int64_t part_cnt, + int64_t &table_dop); + int get_task(ObIArray &tasks, const ObAddr &addr, ObBatchEstTasks *&task); + int choose_storage_estimation_partitions(const int64_t partition_limit, + const DASTabletLocSEArray &tablet_locs, + DASTabletLocSEArray &chosen_tablet_locs); + +private: + ObExecContext &ctx_; +}; + +} // namespace common +} // namespace oceanbase + +#endif // OCEANBASE_SQL_PLAN_CACHE_OB_ADAPTIVE_AUTO_DOP_ diff --git a/src/sql/plan_cache/ob_plan_cache_struct.h b/src/sql/plan_cache/ob_plan_cache_struct.h index 1cd69d67af..0f6394864a 100644 --- a/src/sql/plan_cache/ob_plan_cache_struct.h +++ b/src/sql/plan_cache/ob_plan_cache_struct.h @@ -403,7 +403,8 @@ struct ObPlanCacheCtx : public ObILibCacheCtx insert_batch_opt_info_(allocator), is_max_curr_limit_(false), is_batch_insert_opt_(false), - is_arraybinding_(false) + is_arraybinding_(false), + exist_local_plan_(false) { fp_result_.pc_key_.mode_ = mode_; } @@ -479,7 +480,8 @@ struct ObPlanCacheCtx : public ObILibCacheCtx K(insert_batch_opt_info_), K(is_max_curr_limit_), K(is_batch_insert_opt_), - K(is_arraybinding_) + K(is_arraybinding_), + K(exist_local_plan_) ); PlanCacheMode mode_; //control use which variables to do match @@ -545,6 +547,7 @@ struct ObPlanCacheCtx : public ObILibCacheCtx bool is_batch_insert_opt_; bool is_arraybinding_; + bool exist_local_plan_; }; struct ObPlanCacheStat diff --git a/src/sql/plan_cache/ob_plan_cache_util.h b/src/sql/plan_cache/ob_plan_cache_util.h index 85315b1de3..4542ac181b 100644 --- a/src/sql/plan_cache/ob_plan_cache_util.h +++ b/src/sql/plan_cache/ob_plan_cache_util.h @@ -621,6 +621,8 @@ struct ObPlanStat common::ObString outline_data_; common::ObString hints_info_; bool hints_all_worked_; + bool is_inner_; + bool is_use_auto_dop_; ObPlanStat() @@ -697,7 +699,9 @@ struct ObPlanStat block_cache_miss_cnt_(0), pre_cal_expr_handler_(NULL), plan_hash_value_(0), - hints_all_worked_(true) + hints_all_worked_(true), + is_inner_(false), + is_use_auto_dop_(false) { exact_mode_sql_id_[0] = '\0'; } @@ -775,7 +779,9 @@ struct ObPlanStat block_cache_miss_cnt_(rhs.block_cache_miss_cnt_), pre_cal_expr_handler_(rhs.pre_cal_expr_handler_), plan_hash_value_(rhs.plan_hash_value_), - hints_all_worked_(rhs.hints_all_worked_) + hints_all_worked_(rhs.hints_all_worked_), + is_inner_(rhs.is_inner_), + is_use_auto_dop_(rhs.is_use_auto_dop_) { exact_mode_sql_id_[0] = '\0'; MEMCPY(plan_sel_info_str_, rhs.plan_sel_info_str_, rhs.plan_sel_info_str_len_); diff --git a/src/sql/plan_cache/ob_plan_set.cpp b/src/sql/plan_cache/ob_plan_set.cpp index fa12f2124b..a584fc4522 100644 --- a/src/sql/plan_cache/ob_plan_set.cpp +++ b/src/sql/plan_cache/ob_plan_set.cpp @@ -34,6 +34,7 @@ #include "pl/ob_pl.h" #include "ob_plan_set.h" #include "share/resource_manager/ob_resource_manager.h" +#include "sql/plan_cache/ob_adaptive_auto_dop.h" using namespace oceanbase; using namespace common; @@ -1249,6 +1250,7 @@ int ObSqlPlanSet::add_plan(ObPhysicalPlan &plan, array_binding_plan_ = &plan; } } else { + is_single_table_ = (1 == sql_ctx.partition_infos_.count()); if (OB_FAIL(add_physical_plan(OB_PHY_PLAN_LOCAL, pc_ctx, plan))) { SQL_PC_LOG(TRACE, "fail to add local plan", K(ret)); // } else if (OB_SUCC(ret) @@ -1274,6 +1276,7 @@ int ObSqlPlanSet::add_plan(ObPhysicalPlan &plan, } } break; case OB_PHY_PLAN_DISTRIBUTED: { + is_single_table_ = (1 == sql_ctx.partition_infos_.count()); SQL_PC_LOG(TRACE, "plan set add plan, distr plan", K(ret)); if (OB_FAIL(add_physical_plan(OB_PHY_PLAN_DISTRIBUTED, pc_ctx, plan))) { LOG_WARN("failed to add dist plan", K(ret), K(plan)); @@ -1372,6 +1375,13 @@ int ObSqlPlanSet::init_new_set(const ObPlanCacheCtx &pc_ctx, LOG_DEBUG("contain virtual table", K(is_contain_virtual_table_), K(schema_obj)); } } // for end + for (int64_t i = 0; !is_contain_inner_table_ && i < plan.get_dependency_table().count(); i++) { + const ObSchemaObjVersion &schema_obj = plan.get_dependency_table().at(i); + if (is_inner_table(schema_obj.object_id_)) { + is_contain_inner_table_ = true; + LOG_DEBUG("contain virtual table", K(is_contain_inner_table_), K(schema_obj)); + } + } // for end } bool contain_index_location = false; @@ -1886,22 +1896,42 @@ int ObSqlPlanSet::try_get_local_plan(ObPlanCacheCtx &pc_ctx, int ret = OB_SUCCESS; plan = NULL; get_next = false; + ObExecContext &exec_ctx = pc_ctx.exec_ctx_; ObPhyPlanType real_type = OB_PHY_PLAN_UNINITIALIZED; ObSEArray candi_table_locs; if (OB_ISNULL(local_plan_)) { LOG_DEBUG("local plan is null"); get_next = true; - } else if (FALSE_IT(plan = local_plan_)) { - } else if (OB_FAIL(get_plan_type(plan->get_table_locations(), - plan->has_uncertain_local_operator(), - pc_ctx, - candi_table_locs, - real_type))) { - LOG_WARN("fail to get plan type", K(ret)); - } else if (OB_PHY_PLAN_LOCAL != real_type) { - LOG_DEBUG("not local plan", K(real_type)); - plan = NULL; - get_next = true; + } else { + pc_ctx.exist_local_plan_ = true; + if (FALSE_IT(plan = local_plan_)) { + } else if (OB_FAIL(get_plan_type(plan->get_table_locations(), + plan->has_uncertain_local_operator(), pc_ctx, candi_table_locs, + real_type))) { + LOG_WARN("fail to get plan type", K(ret)); + } else if (OB_PHY_PLAN_LOCAL != real_type) { + LOG_DEBUG("not local plan", K(real_type)); + plan = NULL; + get_next = true; + } else if (GCONF._enable_adaptive_auto_dop && is_single_table_ && !is_contain_inner_table_ + && !plan->stat_.is_inner_) { + int64_t dop = -1; + bool is_single_part = false; + ObAdaptiveAutoDop adaptive_auto_dop(exec_ctx); + AutoDopHashMap &auto_dop_map = exec_ctx.get_auto_dop_map(); + if (OB_FAIL(adaptive_auto_dop.calculate_table_auto_dop(*plan, auto_dop_map, is_single_part))) { + LOG_WARN("failed to calculate table auto dop", K(ret)); + } else if (OB_FAIL(auto_dop_map.get_refactored(0, dop))) { + LOG_WARN("failed to get refactored", K(ret)); + } else if (dop > 1) { + plan = NULL; + get_next = true; + } + if (OB_FAIL(ret)) { + auto_dop_map.clear(); + } + LOG_TRACE("adaptive px dop", K(ret), K(is_single_part), K(dop)); + } } if (OB_SUCC(ret) && NULL == plan) { get_next = true; @@ -1946,10 +1976,30 @@ int ObSqlPlanSet::try_get_dist_plan(ObPlanCacheCtx &pc_ctx, { int ret = OB_SUCCESS; plan = NULL; + ObExecContext &exec_ctx = pc_ctx.exec_ctx_; if (OB_FAIL(dist_plans_.get_plan(pc_ctx, plan))) { LOG_TRACE("failed to get dist plan", K(ret)); } else if (plan != NULL) { LOG_TRACE("succeed to get dist plan", K(*plan)); + if (GCONF._enable_adaptive_auto_dop && is_single_table_ && !is_contain_inner_table_ + && !plan->stat_.is_inner_) { + int64_t dop = -1; + bool is_single_part = false; + ObAdaptiveAutoDop adaptive_auto_dop(exec_ctx); + AutoDopHashMap &auto_dop_map = exec_ctx.get_auto_dop_map(); + if (OB_FAIL(adaptive_auto_dop.calculate_table_auto_dop(*plan, auto_dop_map, is_single_part))) { + LOG_WARN("failed to calculate table auto dop", K(ret)); + } else if (OB_FAIL(auto_dop_map.get_refactored(0, dop))) { + LOG_WARN("failed to get refactored", K(ret)); + } else if (is_single_part && plan->get_is_use_auto_dop() && !pc_ctx.exist_local_plan_ && dop <= 1) { + plan = NULL; + exec_ctx.set_force_gen_local_plan(); + } + if (OB_FAIL(ret)) { + auto_dop_map.clear(); + } + LOG_TRACE("adaptive px dop", K(ret), K(dop), K(is_single_part), K(pc_ctx.exist_local_plan_)); + } } if (OB_SQL_PC_NOT_EXIST == ret) { ret = OB_SUCCESS; @@ -2020,6 +2070,7 @@ void ObSqlPlanSet::reset() has_duplicate_table_ = false; //has_array_binding_ = false; is_contain_virtual_table_ = false; + is_contain_inner_table_ = false; enable_inner_part_parallel_exec_ = false; table_locations_.reset(); if (OB_ISNULL(plan_cache_value_) diff --git a/src/sql/plan_cache/ob_plan_set.h b/src/sql/plan_cache/ob_plan_set.h index 23442d4ef3..a7b9f02e20 100644 --- a/src/sql/plan_cache/ob_plan_set.h +++ b/src/sql/plan_cache/ob_plan_set.h @@ -319,7 +319,9 @@ public: has_duplicate_table_(false), //has_array_binding_(false), is_contain_virtual_table_(false), - enable_inner_part_parallel_exec_(false) + enable_inner_part_parallel_exec_(false), + is_single_table_(false), + is_contain_inner_table_(false) { } @@ -453,6 +455,8 @@ private: bool is_contain_virtual_table_; // px并行度是否大于1 bool enable_inner_part_parallel_exec_; + bool is_single_table_; + bool is_contain_inner_table_; }; inline ObPlanSetType ObPlanSet::get_plan_set_type_by_cache_obj_type(ObLibCacheNameSpace ns) diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index da0719eeb2..d68e42f7ce 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -313,6 +313,7 @@ _data_storage_io_timeout _delay_resource_recycle_after_correctness_issue _display_mysql_version _enable_active_txn_transfer +_enable_adaptive_auto_dop _enable_adaptive_compaction _enable_adaptive_merge_schedule _enable_add_fulltext_index_to_existing_table