[CP] fix bug when spm add plan to plan cache concurrently

This commit is contained in:
obdev 2024-07-16 14:55:25 +00:00 committed by ob-robot
parent 8d9b041d67
commit a003920857
5 changed files with 65 additions and 73 deletions

View File

@ -5150,15 +5150,25 @@ OB_NOINLINE int ObSql::handle_physical_plan(const ObString &trimed_stmt,
// constrain. In this situation, we can't compare two plan. So just keep try next baseline.
spm_ctx.evolution_task_in_two_plan_set_ = false;
need_get_baseline = true;
} else if (spm_ctx.baseline_exists_) {
// When adding baseline plan and find other session alerady added. Then stop searching next
// baseline and execute this plan directly.
need_get_baseline = false;
spm_ctx.baseline_exists_ = false;
} else {
// add baseline plan failed, need evict unaccepted baseline in baseline cache.
(void) ObSpmController::deny_new_plan_as_baseline(spm_ctx);
}
} else if (plan_added && ObSpmCacheCtx::SpmStat::STAT_ADD_BASELINE_PLAN == spm_ctx.spm_stat_) {
spm_ctx.spm_force_disable_ = true;
spm_ctx.spm_stat_ = ObSpmCacheCtx::STAT_FIRST_EXECUTE_PLAN;
spm_ctx.is_retry_for_spm_ = false;
ret = OB_SQL_RETRY_SPM;
if (nullptr != spm_ctx.baseline_guard_.get_cache_obj() &&
static_cast<ObPlanBaselineItem*>(spm_ctx.baseline_guard_.get_cache_obj())->get_fixed()) {
// fixed baseline plan, use is directly
} else {
spm_ctx.spm_force_disable_ = true;
spm_ctx.spm_stat_ = ObSpmCacheCtx::STAT_FIRST_EXECUTE_PLAN;
spm_ctx.is_retry_for_spm_ = false;
ret = OB_SQL_RETRY_SPM;
}
}
} else {
LOG_TRACE("spm need get baseline due to plan hash value not equal");

View File

@ -1252,13 +1252,10 @@ int ObSqlPlanSet::add_plan(ObPhysicalPlan &plan,
} else {
if (OB_FAIL(add_physical_plan(OB_PHY_PLAN_LOCAL, pc_ctx, plan))) {
SQL_PC_LOG(TRACE, "fail to add local plan", K(ret));
} else if (OB_SUCC(ret)
#ifdef OB_BUILD_SPM
&& is_spm_closed_
#endif
&& FALSE_IT(direct_local_plan_ = &plan)) {
// } else if (OB_SUCC(ret)
// && FALSE_IT(direct_local_plan_ = &plan)) {
// do nothing
} else {
// } else {
// local_phy_locations_.reset();
// if (OB_FAIL(init_phy_location(table_locs.count()))) {
// SQL_PC_LOG(WARN, "init phy location failed");
@ -1340,19 +1337,10 @@ int ObSqlPlanSet::init_new_set(const ObPlanCacheCtx &pc_ctx,
outline_param_idx_ = outline_param_idx;
need_try_plan_ = 0;
has_duplicate_table_ = false;
#ifdef OB_BUILD_SPM
int64_t spm_mode = 0;
#endif
const ObSQLSessionInfo *session_info = sql_ctx.session_info_;
if (OB_ISNULL(session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid null plan cache or session info", K(ret), K(session_info));
#ifdef OB_BUILD_SPM
} else if (OB_FAIL(session_info->get_spm_mode(spm_mode))) {
LOG_WARN("failed to get spm mode", K(ret));
} else if (FALSE_IT(is_spm_closed_ = (0 == spm_mode))) {
// do nothing
#endif
} else if (OB_ISNULL(pc_malloc_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pc_allocator has not been initialized.", K(ret));
@ -1607,7 +1595,7 @@ int ObSqlPlanSet::add_physical_plan(const ObPhyPlanType plan_type,
LOG_WARN("failed to add dist plan", K(ret), K(plan));
}
#else
} else if (!pc_ctx.need_evolution_ || is_spm_closed_) {
} else if (!pc_ctx.need_evolution_) {
// Addition of other non-evolving plans is prohibited in plan evolution
if (OB_PHY_PLAN_LOCAL == plan_type) {
if (local_evolution_plan_.get_is_evolving_flag()) {
@ -1628,20 +1616,33 @@ int ObSqlPlanSet::add_physical_plan(const ObPhyPlanType plan_type,
ObSpmCacheCtx& spm_ctx = pc_ctx.sql_ctx_.spm_ctx_;
if (ObSpmCacheCtx::STAT_ADD_BASELINE_PLAN == spm_ctx.spm_stat_) {
if (OB_PHY_PLAN_LOCAL == spm_ctx.evolution_plan_type_) {
OZ (local_evolution_plan_.add_plan(pc_ctx, &plan));
// if local plan start evolving, clear local_plan_ which is added between adding
// evolving plan and baseline plan.
if (OB_FAIL(local_evolution_plan_.add_plan(pc_ctx, &plan))) {
LOG_WARN("failed to add local baseline plan");
} else if (local_evolution_plan_.get_is_evolving_flag() && NULL != local_plan_) {
remove_cache_obj_entry(local_plan_->get_plan_id());
local_plan_ = NULL;
}
} else {
OZ (dist_evolution_plan_.add_plan(pc_ctx, &plan));
if (OB_FAIL(dist_evolution_plan_.add_plan(pc_ctx, &plan))) {
LOG_WARN("failed to add dist baseline plan");
} else if (dist_evolution_plan_.get_is_evolving_flag() &&
OB_FAIL(dist_plans_.remove_plan_stat())) {
LOG_WARN("failed to remove dist plans");
}
}
} else {
if (OB_PHY_PLAN_LOCAL == plan_type) {
if (NULL != local_plan_) {
remove_cache_obj_entry(local_plan_->get_plan_id());
local_plan_ = NULL;
// if a baseline local plan already exists, no need to evolve
ret = OB_SQL_PC_PLAN_DUPLICATE;
} else if (OB_FAIL(local_evolution_plan_.add_plan(pc_ctx, &plan))) {
LOG_WARN("failed to add local evolving plan");
} else {
spm_ctx.evolution_plan_type_ = OB_PHY_PLAN_LOCAL;
}
OZ (local_evolution_plan_.add_plan(pc_ctx, &plan));
OX (spm_ctx.evolution_plan_type_ = OB_PHY_PLAN_LOCAL);
} else {
OZ (dist_plans_.remove_plan_stat());
OZ (dist_evolution_plan_.add_plan(pc_ctx, &plan));
OX (spm_ctx.evolution_plan_type_ = OB_PHY_PLAN_DISTRIBUTED);
}
@ -1712,12 +1713,7 @@ int ObSqlPlanSet::add_evolution_plan_for_spm(ObPhysicalPlan *plan, ObPlanCacheCt
LOG_WARN("not supported type", K(ret), K(plan->get_plan_type()));
} else if (FALSE_IT(pc = plan_cache_value_->get_pcv_set()->get_plan_cache())) {
} else if (OB_PHY_PLAN_LOCAL == plan->get_plan_type()) {
if (NULL != local_plan_) {
ret = OB_SQL_PC_PLAN_DUPLICATE;
LOG_WARN("local plan duplicate", K(ret));
} else {
local_plan_ = plan;
}
local_plan_ = plan;
} else if (OB_FAIL(dist_plans_.add_evolution_plan(*plan, ctx))) {
LOG_WARN("failed to add dist plan", K(ret), K(plan));
}
@ -1831,7 +1827,7 @@ int ObSqlPlanSet::try_get_local_evolution_plan(ObPlanCacheCtx &pc_ctx,
int ret = OB_SUCCESS;
plan = NULL;
get_next = false;
if ((!is_spm_closed_ && local_evolution_plan_.get_is_evolving_flag())
if ((local_evolution_plan_.get_is_evolving_flag())
|| pc_ctx.sql_ctx_.spm_ctx_.force_get_evolution_plan()) {
if (OB_FAIL(local_evolution_plan_.get_plan(pc_ctx, plan))) {
if (OB_SQL_PC_NOT_EXIST == ret) {
@ -1853,7 +1849,7 @@ int ObSqlPlanSet::try_get_dist_evolution_plan(ObPlanCacheCtx &pc_ctx,
int ret = OB_SUCCESS;
plan = NULL;
get_next = false;
if ((!is_spm_closed_ && dist_evolution_plan_.get_is_evolving_flag())
if ((dist_evolution_plan_.get_is_evolving_flag())
|| pc_ctx.sql_ctx_.spm_ctx_.force_get_evolution_plan()) {
if (OB_FAIL(dist_evolution_plan_.get_plan(pc_ctx, plan))) {
if (OB_SQL_PC_NOT_EXIST == ret) {

View File

@ -321,12 +321,7 @@ public:
has_duplicate_table_(false),
//has_array_binding_(false),
is_contain_virtual_table_(false),
#ifdef OB_BUILD_SPM
enable_inner_part_parallel_exec_(false),
is_spm_closed_(false)
#else
enable_inner_part_parallel_exec_(false)
#endif
{
}
@ -460,9 +455,6 @@ private:
bool is_contain_virtual_table_;
// px并行度是否大于1
bool enable_inner_part_parallel_exec_;
#ifdef OB_BUILD_SPM
bool is_spm_closed_;
#endif
};
inline ObPlanSetType ObPlanSet::get_plan_set_type_by_cache_obj_type(ObLibCacheNameSpace ns)

View File

@ -8684,19 +8684,11 @@ int ObQueryRange::serialize_cur_keypart(const ObKeyPart &cur, char *buf, int64_t
{
int ret = OB_SUCCESS;
bool has_item_next = (cur.item_next_ != NULL);
bool is_stack_overflow = false;
if (OB_FAIL(check_stack_overflow(is_stack_overflow))) {
LOG_WARN("failed to do stack overflow check", K(ret));
} else if (is_stack_overflow) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("stack overflow", K(ret));
} else {
OB_UNIS_ENCODE(cur);
OB_UNIS_ENCODE(has_item_next);
if (OB_SUCC(ret) && has_item_next) {
if (OB_FAIL(SMART_CALL(serialize_cur_keypart(*cur.item_next_, buf, buf_len, pos)))) {
LOG_WARN("serialize cur keypart failed", K(ret));
}
OB_UNIS_ENCODE(cur);
OB_UNIS_ENCODE(has_item_next);
if (OB_SUCC(ret) && has_item_next) {
if (OB_FAIL(SMART_CALL(serialize_cur_keypart(*cur.item_next_, buf, buf_len, pos)))) {
LOG_WARN("serialize cur keypart failed", K(ret));
}
}
return ret;

View File

@ -255,21 +255,16 @@ struct ObSpmCacheCtx : public ObILibCacheCtx
handle_cache_mode_(MODE_INVALID),
plan_hash_value_(0),
offset_(-1),
check_execute_status_(false),
is_retry_for_spm_(false),
new_plan_hash_(0),
baseline_guard_(PLAN_BASELINE_HANDLE),
spm_stat_(STAT_INVALID),
cache_node_empty_(true),
spm_force_disable_(false),
has_fixed_plan_to_check_(false),
evolution_plan_type_(OB_PHY_PLAN_UNINITIALIZED),
select_plan_type_(INVALID_TYPE),
cur_baseline_not_enable_(false),
need_spm_timeout_(false),
baseline_exec_time_(0),
evolution_task_in_two_plan_set_(false)
{}
flags_(0)
{
cache_node_empty_ = true;
}
enum SpmMode {
MODE_INVALID,
// for get cache obj
@ -322,21 +317,28 @@ struct ObSpmCacheCtx : public ObILibCacheCtx
SpmMode handle_cache_mode_;
uint64_t plan_hash_value_;
int64_t offset_;
bool check_execute_status_;
bool is_retry_for_spm_;
char sql_id_[common::OB_MAX_SQL_ID_LENGTH + 1];
uint64_t new_plan_hash_;
ObCacheObjGuard baseline_guard_;
SpmStat spm_stat_;
bool cache_node_empty_;
bool spm_force_disable_;
bool has_fixed_plan_to_check_;
ObPhyPlanType evolution_plan_type_;
SpmSelectPlanType select_plan_type_; // for retry
bool cur_baseline_not_enable_;
bool need_spm_timeout_;
int64_t baseline_exec_time_;
bool evolution_task_in_two_plan_set_;
union {
uint64_t flags_;
struct {
uint64_t check_execute_status_: 1;
uint64_t is_retry_for_spm_: 1;
uint64_t cache_node_empty_: 1;
uint64_t spm_force_disable_: 1;
uint64_t has_fixed_plan_to_check_: 1;
uint64_t cur_baseline_not_enable_: 1;
uint64_t need_spm_timeout_: 1;
uint64_t evolution_task_in_two_plan_set_: 1;
uint64_t baseline_exists_: 1;
uint64_t reserved_: 55;
};
};
};
struct EvolutionTaskResult