From 62a513621acfa7db9059caf0f59d04647ee4cbdd Mon Sep 17 00:00:00 2001 From: chimyue Date: Tue, 8 Oct 2024 18:16:03 +0000 Subject: [PATCH] [CP] support batch load spm baseline --- src/observer/ob_rpc_processor_simple.cpp | 4 +- src/pl/ob_pl_interface_pragma.h | 1 + .../code_generator/ob_static_engine_cg.cpp | 2 + src/sql/engine/ob_physical_plan.cpp | 4 + src/sql/engine/ob_physical_plan.h | 14 ++- src/sql/plan_cache/ob_plan_cache.cpp | 112 ++++++++++++------ src/sql/plan_cache/ob_plan_cache.h | 7 +- src/sql/spm/ob_plan_baseline_mgr.h | 32 ++++- src/sql/spm/ob_plan_baseline_sql_service.h | 12 +- src/sql/spm/ob_spm_controller.h | 2 +- 10 files changed, 145 insertions(+), 45 deletions(-) diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index 2fb8fee8f..4b0afe8d1 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -2873,7 +2873,7 @@ int ObServerCancelEvolveTaskP::process() MTL_SWITCH(arg.tenant_id_) { plan_cache = MTL(ObPlanCache*); if (evict_baseline && OB_FAIL(plan_cache-> - cache_evict_baseline_by_sql_id(arg.database_id_, arg.sql_id_))) { + cache_evict_baseline(arg.database_id_, arg.sql_id_))) { LOG_WARN("failed to evict baseline by sql id", K(ret)); } else if (evict_plan && OB_FAIL(plan_cache-> cache_evict_plan_by_sql_id(arg.database_id_, arg.sql_id_))) { @@ -2909,7 +2909,7 @@ int ObLoadBaselineV2P::process() MTL_SWITCH(arg_.tenant_id_) { ObPlanCache *plan_cache = MTL(ObPlanCache*); uint64_t load_count = 0; - if (OB_INVALID_ID == arg_.tenant_id_ || arg_.sql_id_.empty()) { // load appointed tenant cache + if (OB_UNLIKELY(OB_INVALID_ID == arg_.tenant_id_)) { // load appointed tenant cache ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K_(arg), K(ret)); } else if (OB_FAIL(plan_cache->load_plan_baseline(arg_, load_count))) { diff --git a/src/pl/ob_pl_interface_pragma.h b/src/pl/ob_pl_interface_pragma.h index e8e93aa76..53670f1df 100644 --- a/src/pl/ob_pl_interface_pragma.h +++ b/src/pl/ob_pl_interface_pragma.h @@ -596,6 +596,7 @@ INTERFACE_DEF(INTERFACE_DBMS_SPM_CONFIGURE, "CONFIGURE", (ObDBMSSpm::configure)) INTERFACE_DEF(INTERFACE_DBMS_SPM_DROP_SQL_PLAN_BASELINE, "DROP_SQL_PLAN_BASELINE", (ObDBMSSpm::drop_baseline)) INTERFACE_DEF(INTERFACE_DBMS_SPM_LOAD_PLANS_FROM_CURSOR_CACHE, "LOAD_PLANS_FROM_CURSOR_CACHE", (ObDBMSSpm::load_plans_from_cursor_cache)) + INTERFACE_DEF(INTERFACE_DBMS_SPM_BATCH_LOAD_PLANS_FROM_CURSOR_CACHE, "BATCH_LOAD_PLANS_FROM_CURSOR_CACHE", (ObDBMSSpm::batch_load_plans_from_cursor_cache)) INTERFACE_DEF(INTERFACE_DBMS_SPM_AUTO_PURGE_SQL_PLAN_BASELINE, "AUTO_PURGE_SQL_PLAN_BASELINE", (ObDBMSSpm::auto_purge_sql_plan_baseline)) // end of dbms_spm #endif diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 10ec97bcd..3da70201c 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -8649,6 +8649,8 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical phy_plan.set_minimal_worker_count(log_plan.get_optimizer_context().get_minimal_worker_count()); phy_plan.set_is_batched_multi_stmt(log_plan.get_optimizer_context().is_batched_multi_stmt()); phy_plan.set_need_consistent_snapshot(log_plan.need_consistent_read()); + phy_plan.set_is_inner_sql(my_session->is_inner()); + phy_plan.set_is_batch_params_execute(sql_ctx->is_batch_params_execute()); // only if all servers's version >= CLUSTER_VERSION_4_2_0_0 if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) { phy_plan.set_enable_px_fast_reclaim(GCONF._enable_px_fast_reclaim); diff --git a/src/sql/engine/ob_physical_plan.cpp b/src/sql/engine/ob_physical_plan.cpp index 381e6d716..51e558596 100644 --- a/src/sql/engine/ob_physical_plan.cpp +++ b/src/sql/engine/ob_physical_plan.cpp @@ -137,6 +137,8 @@ ObPhysicalPlan::ObPhysicalPlan(MemoryContext &mem_context /* = CURRENT_CONTEXT * use_rich_format_(false), subschema_ctx_(allocator_), disable_auto_memory_mgr_(false), + is_inner_sql_(false), + is_batch_params_execute_(false), all_local_session_vars_(&allocator_), udf_has_dml_stmt_(false), mview_ids_(&allocator_), @@ -245,6 +247,8 @@ void ObPhysicalPlan::reset() subschema_ctx_.reset(); all_local_session_vars_.reset(); udf_has_dml_stmt_ = false; + is_inner_sql_ = false; + is_batch_params_execute_ = false; mview_ids_.reset(); enable_inc_direct_load_ = false; enable_replace_ = false; diff --git a/src/sql/engine/ob_physical_plan.h b/src/sql/engine/ob_physical_plan.h index 482fb9a28..777f1a185 100644 --- a/src/sql/engine/ob_physical_plan.h +++ b/src/sql/engine/ob_physical_plan.h @@ -466,11 +466,20 @@ public: inline bool is_insert_select() const { return is_insert_select_; } inline void set_is_plain_insert(bool v) { is_plain_insert_ = v; } inline bool is_plain_insert() const { return is_plain_insert_; } + inline void set_is_inner_sql(bool v) { is_inner_sql_ = v; } + inline void set_is_batch_params_execute(bool v) { is_batch_params_execute_ = v; } inline bool is_dml_write_stmt() const { return ObStmt::is_dml_write_stmt(stmt_type_); } inline bool should_add_baseline() const { return (ObStmt::is_dml_stmt(stmt_type_) && (stmt::T_INSERT != stmt_type_ || is_insert_select_) - && (stmt::T_REPLACE != stmt_type_ || is_insert_select_)); + && (stmt::T_REPLACE != stmt_type_ || is_insert_select_) + // TODO:@yibo inner sql 先不用SPM? pl里面的执行的SQL也是inner sql, + && !is_inner_sql_ + && !is_batch_params_execute_ + // TODO:@yibo batch multi stmt relay get_plan to init some structure. But spm may not enter + // get_plan. Now we disable spm when batch multi stmt exists. + && !is_remote_plan() + && is_dep_base_table()); } inline bool is_plain_select() const { @@ -722,7 +731,8 @@ public: bool use_rich_format_; ObSubSchemaCtx subschema_ctx_; bool disable_auto_memory_mgr_; - + bool is_inner_sql_; + bool is_batch_params_execute_; private: common::ObFixedArray all_local_session_vars_; public: diff --git a/src/sql/plan_cache/ob_plan_cache.cpp b/src/sql/plan_cache/ob_plan_cache.cpp index 331ba2ca2..b058ebef4 100644 --- a/src/sql/plan_cache/ob_plan_cache.cpp +++ b/src/sql/plan_cache/ob_plan_cache.cpp @@ -41,6 +41,7 @@ #include "sql/spm/ob_spm_define.h" #include "sql/spm/ob_spm_controller.h" #include "sql/spm/ob_spm_evolution_plan.h" +#include "sql/spm/ob_plan_baseline_mgr.h" #endif #include "pl/pl_cache/ob_pl_cache_mgr.h" #include "sql/plan_cache/ob_values_table_compression.h" @@ -55,13 +56,14 @@ namespace oceanbase { namespace sql { -struct ObGetPlanIdBySqlIdOp +struct ObGetCandiBaselinePlanIdOp { - explicit ObGetPlanIdBySqlIdOp(common::ObIArray *key_array, + explicit ObGetCandiBaselinePlanIdOp(common::ObIArray *key_array, + uint64_t db_id, const common::ObString &sql_id, const bool with_plan_hash, const uint64_t &plan_hash_value) - : key_array_(key_array), sql_id_(sql_id), with_plan_hash_(with_plan_hash), plan_hash_value_(plan_hash_value) + : key_array_(key_array), db_id_(db_id), sql_id_(sql_id), with_plan_hash_(with_plan_hash), plan_hash_value_(plan_hash_value) { } int operator()(common::hash::HashMapPair &entry) @@ -77,7 +79,9 @@ struct ObGetPlanIdBySqlIdOp } else if (OB_ISNULL(plan = dynamic_cast(entry.second))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected null plan", K(ret), K(plan)); - } else if (sql_id_ != plan->stat_.sql_id_) { + } else if (!plan->should_add_baseline() || plan->stat_.constructed_sql_.empty()) { + // do nothing + } else if (!sql_id_.empty() && (db_id_ != plan->stat_.db_id_ || sql_id_ != plan->stat_.sql_id_)) { // do nothing } else if (with_plan_hash_ && plan->stat_.plan_hash_value_ != plan_hash_value_) { // do nothing @@ -89,6 +93,7 @@ struct ObGetPlanIdBySqlIdOp } common::ObIArray *key_array_; + uint64_t db_id_; common::ObString sql_id_; bool with_plan_hash_; uint64_t plan_hash_value_; @@ -160,12 +165,12 @@ struct ObGetKVEntryBySQLIDOp : public ObKVEntryTraverseOp }; #ifdef OB_BUILD_SPM -struct ObGetPlanBaselineBySQLIDOp : public ObKVEntryTraverseOp +struct ObGetPlanBaselineOp : public ObKVEntryTraverseOp { - explicit ObGetPlanBaselineBySQLIDOp(uint64_t db_id, - common::ObString sql_id, - LCKeyValueArray *key_val_list, - const CacheRefHandleID ref_handle) + explicit ObGetPlanBaselineOp(uint64_t db_id, + common::ObString sql_id, + LCKeyValueArray *key_val_list, + const CacheRefHandleID ref_handle) : ObKVEntryTraverseOp(key_val_list, ref_handle), db_id_(db_id), sql_id_(sql_id) @@ -177,7 +182,9 @@ struct ObGetPlanBaselineBySQLIDOp : public ObKVEntryTraverseOp is_match = false; if (ObLibCacheNameSpace::NS_SPM == entry.first->namespace_) { ObBaselineKey *key = static_cast(entry.first); - if (db_id_ != common::OB_INVALID_ID && db_id_ != key->db_id_) { + if (sql_id_.empty()) { + is_match = true; + } else if (db_id_ != common::OB_INVALID_ID && db_id_ != key->db_id_) { // skip entry that has non-matched db_id } else if (sql_id_ == key->sql_id_) { is_match = true; @@ -1402,12 +1409,12 @@ int ObPlanCache::cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql } #ifdef OB_BUILD_SPM -int ObPlanCache::cache_evict_baseline_by_sql_id(uint64_t db_id, common::ObString sql_id) +int ObPlanCache::cache_evict_baseline(uint64_t db_id, common::ObString sql_id) { int ret = OB_SUCCESS; SQL_PC_LOG(TRACE, "cache evict plan baseline by sql id start"); LCKeyValueArray to_evict_keys; - ObGetPlanBaselineBySQLIDOp get_ids_op(db_id, sql_id, &to_evict_keys, PLAN_BASELINE_HANDLE); + ObGetPlanBaselineOp get_ids_op(db_id, sql_id, &to_evict_keys, PLAN_BASELINE_HANDLE); if (OB_FAIL(foreach_cache_evict(get_ids_op))) { SQL_PC_LOG(WARN, "failed to foreach cache evict", K(ret)); } @@ -1589,33 +1596,72 @@ int ObPlanCache::cache_evict_by_glitch_node() int ObPlanCache::load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &load_count) { int ret = OB_SUCCESS; - common::ObSEArray plan_ids; ObGlobalReqTimeService::check_req_timeinfo(); - ObGetPlanIdBySqlIdOp plan_id_op(&plan_ids, arg.sql_id_, arg.with_plan_hash_, arg.plan_hash_value_); load_count = 0; - if (OB_FAIL(co_mgr_.foreach_cache_obj(plan_id_op))) { - LOG_WARN("fail to traverse id2stat_map", K(ret)); - } else { - ObPhysicalPlan *plan = NULL; - LOG_INFO("load plan baseline by sql ids", K(arg), K(plan_ids)); - for (int64_t i = 0; i < plan_ids.count(); i++) { - uint64_t plan_id= plan_ids.at(i); - ObCacheObjGuard guard(LOAD_BASELINE_HANDLE); - int tmp_ret = ref_plan(plan_id, guard); //plan引用计数加1 - plan = static_cast(guard.cache_obj_); - if (OB_HASH_NOT_EXIST == tmp_ret) { - //do nothing; - } else if (OB_SUCCESS != tmp_ret || NULL == plan) { - LOG_WARN("get plan failed", K(tmp_ret), KP(plan)); - } else { - LOG_INFO("load plan baseline by sql id", K(arg)); - if (OB_FAIL(ObSpmController::load_baseline(arg, plan))) { - LOG_WARN("failed to load baseline", K(ret)); + SMART_VAR(PlanIdArray, plan_ids) { + int64_t batch_exec_cnt = 0; + ObGetCandiBaselinePlanIdOp plan_id_op(&plan_ids, arg.database_id_, arg.sql_id_, arg.with_plan_hash_, arg.plan_hash_value_); + if (OB_FAIL(co_mgr_.foreach_cache_obj(plan_id_op))) { + SERVER_LOG(WARN, "fail to traverse id2stat_map"); + } else { + int64_t pos = 0; + while (OB_SUCC(ret) && pos < plan_ids.count()) { + uint64_t tmp_load_count = 0; + if (OB_FAIL(batch_load_plan_baseline(arg, plan_ids, pos, tmp_load_count))) { + LOG_WARN("failed to batch load plan baseline", K(ret)); } else { - ++load_count; + load_count += tmp_load_count; + ++batch_exec_cnt; } } } + if (arg.sql_id_.empty()) { + LOG_INFO("batch load plan baseline", K(load_count), K(batch_exec_cnt), K(plan_ids.count()), K(arg)); + } else { + LOG_INFO("load plan baseline by sql ids", K(load_count), K(batch_exec_cnt), K(plan_ids), K(arg)); + } + } + return ret; +} + +int ObPlanCache::batch_load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, + const PlanIdArray &plan_ids, + int64_t &pos, + uint64_t &load_count) +{ + int ret = OB_SUCCESS; + load_count = 0; + ObSpmBaselineLoader baseline_loader; + if (OB_UNLIKELY(pos < 0 || pos >= plan_ids.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected pos", K(ret), K(pos), K(plan_ids.count())); + } else if (OB_FAIL(baseline_loader.init_baseline_loader(arg))) { + LOG_WARN("failed to init baseline loader", K(ret)); + } else { + bool need_add_next = true; + while (OB_SUCC(ret) && pos < plan_ids.count() && need_add_next) { + ObCacheObjGuard guard(LOAD_BASELINE_HANDLE); + int tmp_ret = ref_plan(plan_ids.at(pos), guard); //plan引用计数加1 + ObPhysicalPlan *plan = static_cast(guard.cache_obj_); + if (OB_HASH_NOT_EXIST == tmp_ret) { + ++pos; + } else if (OB_SUCCESS != tmp_ret || NULL == plan) { + ++pos; + LOG_WARN("get plan failed", K(tmp_ret), KP(plan)); + } else if (OB_FAIL(baseline_loader.add_one_plan_baseline(*plan, need_add_next))) { + LOG_WARN("failed to add one plan baseline", K(ret)); + } else if (need_add_next) { + ++pos; + } + } + + if (OB_FAIL(ret) || 0 >= baseline_loader.get_baseline_count()) { + } else if (OB_FAIL(ObSpmController::load_baseline(baseline_loader))) { + LOG_WARN("failed to load baseline", K(ret)); + } else { + load_count = baseline_loader.get_baseline_count(); + + } } return ret; } diff --git a/src/sql/plan_cache/ob_plan_cache.h b/src/sql/plan_cache/ob_plan_cache.h index 4f45cb64d..4fe12ab33 100644 --- a/src/sql/plan_cache/ob_plan_cache.h +++ b/src/sql/plan_cache/ob_plan_cache.h @@ -55,6 +55,7 @@ class ObILibCacheObject; class ObPhysicalPlan; class ObLibCacheAtomicOp; class ObEvolutionPlan; +class ObSpmBaselineLoader; typedef common::hash::ObHashMap PlanCacheMap; #ifdef OB_BUILD_SPM @@ -361,10 +362,14 @@ public: template int foreach_cache_evict(CallBack &cb); #ifdef OB_BUILD_SPM - int cache_evict_baseline_by_sql_id(uint64_t db_id, common::ObString sql_id); + int cache_evict_baseline(uint64_t db_id, common::ObString sql_id); // load plan baseline from plan cache // int load_plan_baseline(); int load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &load_count); + int batch_load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, + const PlanIdArray &plan_ids, + int64_t &pos, + uint64_t &load_count); int check_baseline_finish(); #endif void destroy(); diff --git a/src/sql/spm/ob_plan_baseline_mgr.h b/src/sql/spm/ob_plan_baseline_mgr.h index 66b3522e2..3fadd7077 100644 --- a/src/sql/spm/ob_plan_baseline_mgr.h +++ b/src/sql/spm/ob_plan_baseline_mgr.h @@ -41,6 +41,36 @@ private: lib::MemoryContext mem_context_; bool inited_; }; + +class ObSpmBaselineLoader +{ +public: + ObSpmBaselineLoader() + : tenant_id_(0), + alloc_guard_(), + baseline_count_(0), + origin_(0), + flags_(0) + {} + virtual ~ObSpmBaselineLoader() {} + int init_baseline_loader(const obrpc::ObLoadPlanBaselineArg &arg); + int add_one_plan_baseline(const ObPhysicalPlan &plan, bool &added); + int get_baseline_item_dml(ObSqlString &item_dml); + int get_baseline_info_dml(ObSqlString &info_dml); + inline uint64_t get_baseline_count() { return baseline_count_; } + ObIAllocator *get_allocator() { return alloc_guard_.get_allocator(); } +private: + uint64_t tenant_id_; + SpmTmpAllocatorGuard alloc_guard_; + share::ObDMLSqlSplicer item_dml_splicer_; + share::ObDMLSqlSplicer info_dml_splicer_; + uint64_t baseline_count_; + // info same as ObPlanBaselineItem + int64_t origin_; // baseline source, 1 for AUTO-CAPTURE, 2 for MANUAL-LOAD + common::ObString db_version_; // database version when generate baseline + int64_t flags_; +}; + class ObPlanBaselineRefreshTask : public common::ObTimerTask { public: @@ -102,7 +132,7 @@ public: const uint64_t plan_hash, const bool with_plan_hash, int64_t &baseline_affected); - int load_baseline(ObBaselineKey &key, ObPhysicalPlan* plan, const bool fixed, const bool enabled); + int load_baseline(ObSpmBaselineLoader &baseline_loader); int purge_baselines(const uint64_t tenant_id, int64_t baseline_affected); int evict_plan_baseline(ObSpmCacheCtx& spm_ctx); int check_evolution_task(); diff --git a/src/sql/spm/ob_plan_baseline_sql_service.h b/src/sql/spm/ob_plan_baseline_sql_service.h index b127119f7..714bfa3f9 100644 --- a/src/sql/spm/ob_plan_baseline_sql_service.h +++ b/src/sql/spm/ob_plan_baseline_sql_service.h @@ -29,6 +29,7 @@ class ObMySQLProxy; namespace sql { +class ObSpmBaselineLoader; class ObPlanBaselineSqlService { @@ -38,6 +39,7 @@ public: ~ObPlanBaselineSqlService() {} int init(ObMySQLProxy *proxy); + int load_plan_baseline(const uint64_t tenant_id, ObSpmBaselineLoader &baseline_loader); int update_baseline_item(ObMySQLTransaction& trans, ObIAllocator& allocator, const uint64_t tenant_id, @@ -108,11 +110,11 @@ public: int spm_configure(const uint64_t tenant_id, const uint64_t database_id, const ObString& param_name, const int64_t& param_value); int purge_baselines(const uint64_t tenant_id, const uint64_t current_time, int64_t &baseline_affected); - int convert_sql_string(ObIAllocator &allocator, - const ObCollationType input_collation, - const ObString &input_str, - bool truncate_str, - ObString &output_str); + static int convert_sql_string(ObIAllocator &allocator, + const ObCollationType input_collation, + const ObString &input_str, + bool truncate_str, + ObString &output_str); int update_plan_baselines_result(const uint64_t tenant_id, EvoResultUpdateTask& evo_res); diff --git a/src/sql/spm/ob_spm_controller.h b/src/sql/spm/ob_spm_controller.h index 35b97ed8f..5421ad1d8 100644 --- a/src/sql/spm/ob_spm_controller.h +++ b/src/sql/spm/ob_spm_controller.h @@ -40,7 +40,7 @@ public: static int accept_plan_baseline_by_user(obrpc::ObModifyPlanBaselineArg& arg); static int cancel_evolve_task(obrpc::ObModifyPlanBaselineArg& arg); - static int load_baseline(const obrpc::ObLoadPlanBaselineArg& arg, ObPhysicalPlan* plan); + static int load_baseline(ObSpmBaselineLoader &baseline_loader); static int deny_new_plan_as_baseline(ObSpmCacheCtx& spm_ctx); static int64_t calc_spm_timeout_us(const int64_t normal_timeout, const int64_t baseline_exec_time); };