[CP] support batch load spm baseline

This commit is contained in:
chimyue 2024-10-08 18:16:03 +00:00 committed by ob-robot
parent b4e818c3e7
commit 62a513621a
10 changed files with 145 additions and 45 deletions

View File

@ -2873,7 +2873,7 @@ int ObServerCancelEvolveTaskP::process()
MTL_SWITCH(arg.tenant_id_) {
plan_cache = MTL(ObPlanCache*);
if (evict_baseline && OB_FAIL(plan_cache->
cache_evict_baseline_by_sql_id(arg.database_id_, arg.sql_id_))) {
cache_evict_baseline(arg.database_id_, arg.sql_id_))) {
LOG_WARN("failed to evict baseline by sql id", K(ret));
} else if (evict_plan && OB_FAIL(plan_cache->
cache_evict_plan_by_sql_id(arg.database_id_, arg.sql_id_))) {
@ -2909,7 +2909,7 @@ int ObLoadBaselineV2P::process()
MTL_SWITCH(arg_.tenant_id_) {
ObPlanCache *plan_cache = MTL(ObPlanCache*);
uint64_t load_count = 0;
if (OB_INVALID_ID == arg_.tenant_id_ || arg_.sql_id_.empty()) { // load appointed tenant cache
if (OB_UNLIKELY(OB_INVALID_ID == arg_.tenant_id_)) { // load appointed tenant cache
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K_(arg), K(ret));
} else if (OB_FAIL(plan_cache->load_plan_baseline(arg_, load_count))) {

View File

@ -596,6 +596,7 @@
INTERFACE_DEF(INTERFACE_DBMS_SPM_CONFIGURE, "CONFIGURE", (ObDBMSSpm::configure))
INTERFACE_DEF(INTERFACE_DBMS_SPM_DROP_SQL_PLAN_BASELINE, "DROP_SQL_PLAN_BASELINE", (ObDBMSSpm::drop_baseline))
INTERFACE_DEF(INTERFACE_DBMS_SPM_LOAD_PLANS_FROM_CURSOR_CACHE, "LOAD_PLANS_FROM_CURSOR_CACHE", (ObDBMSSpm::load_plans_from_cursor_cache))
INTERFACE_DEF(INTERFACE_DBMS_SPM_BATCH_LOAD_PLANS_FROM_CURSOR_CACHE, "BATCH_LOAD_PLANS_FROM_CURSOR_CACHE", (ObDBMSSpm::batch_load_plans_from_cursor_cache))
INTERFACE_DEF(INTERFACE_DBMS_SPM_AUTO_PURGE_SQL_PLAN_BASELINE, "AUTO_PURGE_SQL_PLAN_BASELINE", (ObDBMSSpm::auto_purge_sql_plan_baseline))
// end of dbms_spm
#endif

View File

@ -8649,6 +8649,8 @@ int ObStaticEngineCG::set_other_properties(const ObLogPlan &log_plan, ObPhysical
phy_plan.set_minimal_worker_count(log_plan.get_optimizer_context().get_minimal_worker_count());
phy_plan.set_is_batched_multi_stmt(log_plan.get_optimizer_context().is_batched_multi_stmt());
phy_plan.set_need_consistent_snapshot(log_plan.need_consistent_read());
phy_plan.set_is_inner_sql(my_session->is_inner());
phy_plan.set_is_batch_params_execute(sql_ctx->is_batch_params_execute());
// only if all servers's version >= CLUSTER_VERSION_4_2_0_0
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_0_0) {
phy_plan.set_enable_px_fast_reclaim(GCONF._enable_px_fast_reclaim);

View File

@ -137,6 +137,8 @@ ObPhysicalPlan::ObPhysicalPlan(MemoryContext &mem_context /* = CURRENT_CONTEXT *
use_rich_format_(false),
subschema_ctx_(allocator_),
disable_auto_memory_mgr_(false),
is_inner_sql_(false),
is_batch_params_execute_(false),
all_local_session_vars_(&allocator_),
udf_has_dml_stmt_(false),
mview_ids_(&allocator_),
@ -245,6 +247,8 @@ void ObPhysicalPlan::reset()
subschema_ctx_.reset();
all_local_session_vars_.reset();
udf_has_dml_stmt_ = false;
is_inner_sql_ = false;
is_batch_params_execute_ = false;
mview_ids_.reset();
enable_inc_direct_load_ = false;
enable_replace_ = false;

View File

@ -466,11 +466,20 @@ public:
inline bool is_insert_select() const { return is_insert_select_; }
inline void set_is_plain_insert(bool v) { is_plain_insert_ = v; }
inline bool is_plain_insert() const { return is_plain_insert_; }
inline void set_is_inner_sql(bool v) { is_inner_sql_ = v; }
inline void set_is_batch_params_execute(bool v) { is_batch_params_execute_ = v; }
inline bool is_dml_write_stmt() const { return ObStmt::is_dml_write_stmt(stmt_type_); }
inline bool should_add_baseline() const {
return (ObStmt::is_dml_stmt(stmt_type_)
&& (stmt::T_INSERT != stmt_type_ || is_insert_select_)
&& (stmt::T_REPLACE != stmt_type_ || is_insert_select_));
&& (stmt::T_REPLACE != stmt_type_ || is_insert_select_)
// TODO:@yibo inner sql 先不用SPM? pl里面的执行的SQL也是inner sql,
&& !is_inner_sql_
&& !is_batch_params_execute_
// TODO:@yibo batch multi stmt relay get_plan to init some structure. But spm may not enter
// get_plan. Now we disable spm when batch multi stmt exists.
&& !is_remote_plan()
&& is_dep_base_table());
}
inline bool is_plain_select() const
{
@ -722,7 +731,8 @@ public:
bool use_rich_format_;
ObSubSchemaCtx subschema_ctx_;
bool disable_auto_memory_mgr_;
bool is_inner_sql_;
bool is_batch_params_execute_;
private:
common::ObFixedArray<ObLocalSessionVar, common::ObIAllocator> all_local_session_vars_;
public:

View File

@ -41,6 +41,7 @@
#include "sql/spm/ob_spm_define.h"
#include "sql/spm/ob_spm_controller.h"
#include "sql/spm/ob_spm_evolution_plan.h"
#include "sql/spm/ob_plan_baseline_mgr.h"
#endif
#include "pl/pl_cache/ob_pl_cache_mgr.h"
#include "sql/plan_cache/ob_values_table_compression.h"
@ -55,13 +56,14 @@ namespace oceanbase
{
namespace sql
{
struct ObGetPlanIdBySqlIdOp
struct ObGetCandiBaselinePlanIdOp
{
explicit ObGetPlanIdBySqlIdOp(common::ObIArray<uint64_t> *key_array,
explicit ObGetCandiBaselinePlanIdOp(common::ObIArray<uint64_t> *key_array,
uint64_t db_id,
const common::ObString &sql_id,
const bool with_plan_hash,
const uint64_t &plan_hash_value)
: key_array_(key_array), sql_id_(sql_id), with_plan_hash_(with_plan_hash), plan_hash_value_(plan_hash_value)
: key_array_(key_array), db_id_(db_id), sql_id_(sql_id), with_plan_hash_(with_plan_hash), plan_hash_value_(plan_hash_value)
{
}
int operator()(common::hash::HashMapPair<ObCacheObjID, ObILibCacheObject *> &entry)
@ -77,7 +79,9 @@ struct ObGetPlanIdBySqlIdOp
} else if (OB_ISNULL(plan = dynamic_cast<ObPhysicalPlan *>(entry.second))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null plan", K(ret), K(plan));
} else if (sql_id_ != plan->stat_.sql_id_) {
} else if (!plan->should_add_baseline() || plan->stat_.constructed_sql_.empty()) {
// do nothing
} else if (!sql_id_.empty() && (db_id_ != plan->stat_.db_id_ || sql_id_ != plan->stat_.sql_id_)) {
// do nothing
} else if (with_plan_hash_ && plan->stat_.plan_hash_value_ != plan_hash_value_) {
// do nothing
@ -89,6 +93,7 @@ struct ObGetPlanIdBySqlIdOp
}
common::ObIArray<uint64_t> *key_array_;
uint64_t db_id_;
common::ObString sql_id_;
bool with_plan_hash_;
uint64_t plan_hash_value_;
@ -160,12 +165,12 @@ struct ObGetKVEntryBySQLIDOp : public ObKVEntryTraverseOp
};
#ifdef OB_BUILD_SPM
struct ObGetPlanBaselineBySQLIDOp : public ObKVEntryTraverseOp
struct ObGetPlanBaselineOp : public ObKVEntryTraverseOp
{
explicit ObGetPlanBaselineBySQLIDOp(uint64_t db_id,
common::ObString sql_id,
LCKeyValueArray *key_val_list,
const CacheRefHandleID ref_handle)
explicit ObGetPlanBaselineOp(uint64_t db_id,
common::ObString sql_id,
LCKeyValueArray *key_val_list,
const CacheRefHandleID ref_handle)
: ObKVEntryTraverseOp(key_val_list, ref_handle),
db_id_(db_id),
sql_id_(sql_id)
@ -177,7 +182,9 @@ struct ObGetPlanBaselineBySQLIDOp : public ObKVEntryTraverseOp
is_match = false;
if (ObLibCacheNameSpace::NS_SPM == entry.first->namespace_) {
ObBaselineKey *key = static_cast<ObBaselineKey*>(entry.first);
if (db_id_ != common::OB_INVALID_ID && db_id_ != key->db_id_) {
if (sql_id_.empty()) {
is_match = true;
} else if (db_id_ != common::OB_INVALID_ID && db_id_ != key->db_id_) {
// skip entry that has non-matched db_id
} else if (sql_id_ == key->sql_id_) {
is_match = true;
@ -1402,12 +1409,12 @@ int ObPlanCache::cache_evict_plan_by_sql_id(uint64_t db_id, common::ObString sql
}
#ifdef OB_BUILD_SPM
int ObPlanCache::cache_evict_baseline_by_sql_id(uint64_t db_id, common::ObString sql_id)
int ObPlanCache::cache_evict_baseline(uint64_t db_id, common::ObString sql_id)
{
int ret = OB_SUCCESS;
SQL_PC_LOG(TRACE, "cache evict plan baseline by sql id start");
LCKeyValueArray to_evict_keys;
ObGetPlanBaselineBySQLIDOp get_ids_op(db_id, sql_id, &to_evict_keys, PLAN_BASELINE_HANDLE);
ObGetPlanBaselineOp get_ids_op(db_id, sql_id, &to_evict_keys, PLAN_BASELINE_HANDLE);
if (OB_FAIL(foreach_cache_evict(get_ids_op))) {
SQL_PC_LOG(WARN, "failed to foreach cache evict", K(ret));
}
@ -1589,33 +1596,72 @@ int ObPlanCache::cache_evict_by_glitch_node()
int ObPlanCache::load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &load_count)
{
int ret = OB_SUCCESS;
common::ObSEArray<uint64_t, 4> plan_ids;
ObGlobalReqTimeService::check_req_timeinfo();
ObGetPlanIdBySqlIdOp plan_id_op(&plan_ids, arg.sql_id_, arg.with_plan_hash_, arg.plan_hash_value_);
load_count = 0;
if (OB_FAIL(co_mgr_.foreach_cache_obj(plan_id_op))) {
LOG_WARN("fail to traverse id2stat_map", K(ret));
} else {
ObPhysicalPlan *plan = NULL;
LOG_INFO("load plan baseline by sql ids", K(arg), K(plan_ids));
for (int64_t i = 0; i < plan_ids.count(); i++) {
uint64_t plan_id= plan_ids.at(i);
ObCacheObjGuard guard(LOAD_BASELINE_HANDLE);
int tmp_ret = ref_plan(plan_id, guard); //plan引用计数加1
plan = static_cast<ObPhysicalPlan*>(guard.cache_obj_);
if (OB_HASH_NOT_EXIST == tmp_ret) {
//do nothing;
} else if (OB_SUCCESS != tmp_ret || NULL == plan) {
LOG_WARN("get plan failed", K(tmp_ret), KP(plan));
} else {
LOG_INFO("load plan baseline by sql id", K(arg));
if (OB_FAIL(ObSpmController::load_baseline(arg, plan))) {
LOG_WARN("failed to load baseline", K(ret));
SMART_VAR(PlanIdArray, plan_ids) {
int64_t batch_exec_cnt = 0;
ObGetCandiBaselinePlanIdOp plan_id_op(&plan_ids, arg.database_id_, arg.sql_id_, arg.with_plan_hash_, arg.plan_hash_value_);
if (OB_FAIL(co_mgr_.foreach_cache_obj(plan_id_op))) {
SERVER_LOG(WARN, "fail to traverse id2stat_map");
} else {
int64_t pos = 0;
while (OB_SUCC(ret) && pos < plan_ids.count()) {
uint64_t tmp_load_count = 0;
if (OB_FAIL(batch_load_plan_baseline(arg, plan_ids, pos, tmp_load_count))) {
LOG_WARN("failed to batch load plan baseline", K(ret));
} else {
++load_count;
load_count += tmp_load_count;
++batch_exec_cnt;
}
}
}
if (arg.sql_id_.empty()) {
LOG_INFO("batch load plan baseline", K(load_count), K(batch_exec_cnt), K(plan_ids.count()), K(arg));
} else {
LOG_INFO("load plan baseline by sql ids", K(load_count), K(batch_exec_cnt), K(plan_ids), K(arg));
}
}
return ret;
}
int ObPlanCache::batch_load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg,
const PlanIdArray &plan_ids,
int64_t &pos,
uint64_t &load_count)
{
int ret = OB_SUCCESS;
load_count = 0;
ObSpmBaselineLoader baseline_loader;
if (OB_UNLIKELY(pos < 0 || pos >= plan_ids.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected pos", K(ret), K(pos), K(plan_ids.count()));
} else if (OB_FAIL(baseline_loader.init_baseline_loader(arg))) {
LOG_WARN("failed to init baseline loader", K(ret));
} else {
bool need_add_next = true;
while (OB_SUCC(ret) && pos < plan_ids.count() && need_add_next) {
ObCacheObjGuard guard(LOAD_BASELINE_HANDLE);
int tmp_ret = ref_plan(plan_ids.at(pos), guard); //plan引用计数加1
ObPhysicalPlan *plan = static_cast<ObPhysicalPlan*>(guard.cache_obj_);
if (OB_HASH_NOT_EXIST == tmp_ret) {
++pos;
} else if (OB_SUCCESS != tmp_ret || NULL == plan) {
++pos;
LOG_WARN("get plan failed", K(tmp_ret), KP(plan));
} else if (OB_FAIL(baseline_loader.add_one_plan_baseline(*plan, need_add_next))) {
LOG_WARN("failed to add one plan baseline", K(ret));
} else if (need_add_next) {
++pos;
}
}
if (OB_FAIL(ret) || 0 >= baseline_loader.get_baseline_count()) {
} else if (OB_FAIL(ObSpmController::load_baseline(baseline_loader))) {
LOG_WARN("failed to load baseline", K(ret));
} else {
load_count = baseline_loader.get_baseline_count();
}
}
return ret;
}

View File

@ -55,6 +55,7 @@ class ObILibCacheObject;
class ObPhysicalPlan;
class ObLibCacheAtomicOp;
class ObEvolutionPlan;
class ObSpmBaselineLoader;
typedef common::hash::ObHashMap<uint64_t, ObPlanCache *> PlanCacheMap;
#ifdef OB_BUILD_SPM
@ -361,10 +362,14 @@ public:
template<typename CallBack = ObKVEntryTraverseOp>
int foreach_cache_evict(CallBack &cb);
#ifdef OB_BUILD_SPM
int cache_evict_baseline_by_sql_id(uint64_t db_id, common::ObString sql_id);
int cache_evict_baseline(uint64_t db_id, common::ObString sql_id);
// load plan baseline from plan cache
// int load_plan_baseline();
int load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg, uint64_t &load_count);
int batch_load_plan_baseline(const obrpc::ObLoadPlanBaselineArg &arg,
const PlanIdArray &plan_ids,
int64_t &pos,
uint64_t &load_count);
int check_baseline_finish();
#endif
void destroy();

View File

@ -41,6 +41,36 @@ private:
lib::MemoryContext mem_context_;
bool inited_;
};
class ObSpmBaselineLoader
{
public:
ObSpmBaselineLoader()
: tenant_id_(0),
alloc_guard_(),
baseline_count_(0),
origin_(0),
flags_(0)
{}
virtual ~ObSpmBaselineLoader() {}
int init_baseline_loader(const obrpc::ObLoadPlanBaselineArg &arg);
int add_one_plan_baseline(const ObPhysicalPlan &plan, bool &added);
int get_baseline_item_dml(ObSqlString &item_dml);
int get_baseline_info_dml(ObSqlString &info_dml);
inline uint64_t get_baseline_count() { return baseline_count_; }
ObIAllocator *get_allocator() { return alloc_guard_.get_allocator(); }
private:
uint64_t tenant_id_;
SpmTmpAllocatorGuard alloc_guard_;
share::ObDMLSqlSplicer item_dml_splicer_;
share::ObDMLSqlSplicer info_dml_splicer_;
uint64_t baseline_count_;
// info same as ObPlanBaselineItem
int64_t origin_; // baseline source, 1 for AUTO-CAPTURE, 2 for MANUAL-LOAD
common::ObString db_version_; // database version when generate baseline
int64_t flags_;
};
class ObPlanBaselineRefreshTask : public common::ObTimerTask
{
public:
@ -102,7 +132,7 @@ public:
const uint64_t plan_hash,
const bool with_plan_hash,
int64_t &baseline_affected);
int load_baseline(ObBaselineKey &key, ObPhysicalPlan* plan, const bool fixed, const bool enabled);
int load_baseline(ObSpmBaselineLoader &baseline_loader);
int purge_baselines(const uint64_t tenant_id, int64_t baseline_affected);
int evict_plan_baseline(ObSpmCacheCtx& spm_ctx);
int check_evolution_task();

View File

@ -29,6 +29,7 @@ class ObMySQLProxy;
namespace sql
{
class ObSpmBaselineLoader;
class ObPlanBaselineSqlService
{
@ -38,6 +39,7 @@ public:
~ObPlanBaselineSqlService() {}
int init(ObMySQLProxy *proxy);
int load_plan_baseline(const uint64_t tenant_id, ObSpmBaselineLoader &baseline_loader);
int update_baseline_item(ObMySQLTransaction& trans,
ObIAllocator& allocator,
const uint64_t tenant_id,
@ -108,11 +110,11 @@ public:
int spm_configure(const uint64_t tenant_id, const uint64_t database_id, const ObString& param_name, const int64_t& param_value);
int purge_baselines(const uint64_t tenant_id, const uint64_t current_time, int64_t &baseline_affected);
int convert_sql_string(ObIAllocator &allocator,
const ObCollationType input_collation,
const ObString &input_str,
bool truncate_str,
ObString &output_str);
static int convert_sql_string(ObIAllocator &allocator,
const ObCollationType input_collation,
const ObString &input_str,
bool truncate_str,
ObString &output_str);
int update_plan_baselines_result(const uint64_t tenant_id,
EvoResultUpdateTask& evo_res);

View File

@ -40,7 +40,7 @@ public:
static int accept_plan_baseline_by_user(obrpc::ObModifyPlanBaselineArg& arg);
static int cancel_evolve_task(obrpc::ObModifyPlanBaselineArg& arg);
static int load_baseline(const obrpc::ObLoadPlanBaselineArg& arg, ObPhysicalPlan* plan);
static int load_baseline(ObSpmBaselineLoader &baseline_loader);
static int deny_new_plan_as_baseline(ObSpmCacheCtx& spm_ctx);
static int64_t calc_spm_timeout_us(const int64_t normal_timeout, const int64_t baseline_exec_time);
};