修复“select...FOR UPDATE SKIP LOCKED, RIGHT JOIN, cross join查询与预期不符”

This commit is contained in:
muyulinzhong
2024-01-09 16:54:04 +08:00
parent d2533e77d0
commit 63f3fcbe0f
2 changed files with 90 additions and 74 deletions

View File

@ -145,7 +145,10 @@ static Plan* inheritance_planner(PlannerInfo* root);
static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction);
static void preprocess_rowmarks(PlannerInfo* root);
static double preprocess_limit(PlannerInfo* root, double tuple_fraction, int64* offset_est, int64* count_est);
static void process_sort(Query* parse, PlannerInfo* root, PlannerTargets* plannerTargets, Plan** resultPlan,
List* tlist, List* collectiveGroupExpr, List** currentPathKeys, double limitTuples, FDWUpperRelCxt* ufdwCxt);
static void process_rowMarks(Query* parse, Plan** resultPlan, PlannerInfo* root, List** currentPathKeys,
FDWUpperRelCxt* ufdwCxt);
static bool grouping_is_can_hash(Query* parse, AggClauseCosts* agg_costs);
static Size compute_hash_entry_size(bool vectorized, Path* cheapest_path, int path_width, AggClauseCosts* agg_costs);
static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, double limit_tuples, int path_width,
@ -2728,6 +2731,83 @@ static bool has_ts_func(List* tlist)
}
#endif
static void process_sort(Query* parse, PlannerInfo* root, PlannerTargets* plannerTargets, Plan** resultPlan,
List* tlist, List* collectiveGroupExpr, List** currentPathKeys, double limitTuples, FDWUpperRelCxt* ufdwCxt)
{
/*
* If ORDER BY was given and we were not able to make the plan come out in
* the right order, add an explicit sort step.
*/
if (parse->sortClause) {
if (parse->is_flt_frame && parse->hasTargetSRFs) {
List* sortInputTarget = build_plan_tlist(root, plannerTargets->final_target);
(*resultPlan)->targetlist = sortInputTarget;
}
/*
* Set group_set and again build pathkeys, data's value can be altered groupingSet after,
* so equal expr can not be deleted from pathkeys. Rebuild pathkey EquivalenceClass's ec_group_set
* is true.
*/
rebuild_pathkey_for_groupingSet<sort_pathkey>(root, tlist, NULL, collectiveGroupExpr);
/* we also need to add sort if the sub node is parallized. */
if (!pathkeys_contained_in(root->sort_pathkeys, *currentPathKeys) ||
((*resultPlan)->dop > 1 && root->sort_pathkeys)) {
*resultPlan = (Plan*)make_sort_from_pathkeys(root, *resultPlan, root->sort_pathkeys, limitTuples);
#ifdef STREAMPLAN
if (IS_STREAM_PLAN && check_sort_for_upsert(root))
*resultPlan = make_stream_sort(root, *resultPlan);
#endif /* STREAMPLAN */
if (IS_PGXC_COORDINATOR && !IS_STREAM && !IsConnFromCoord())
*resultPlan = (Plan*)create_remotesort_plan(root, *resultPlan);
*currentPathKeys = root->sort_pathkeys;
}
if (parse->is_flt_frame && parse->hasTargetSRFs) {
*resultPlan = adjust_plan_for_srfs(root, *resultPlan,
plannerTargets->final_targets,
plannerTargets->final_targets_contain_srfs);
}
if (ufdwCxt != NULL && ufdwCxt->state != FDW_UPPER_REL_END) {
ufdwCxt->orderExtra = (OrderPathExtraData*)palloc(sizeof(OrderPathExtraData));
ufdwCxt->orderExtra->targetList = (*resultPlan)->targetlist;
AdvanceFDWUpperPlan(ufdwCxt, UPPERREL_ORDERED, *resultPlan);
}
}
}
static void process_rowMarks(Query* parse, Plan** resultPlan, PlannerInfo* root, List** currentPathKeys,
FDWUpperRelCxt* ufdwCxt)
{
/*
* If there is a FOR [KEY] UPDATE/SHARE clause, add the LockRows node. (Note: we
* intentionally test parse->rowMarks not root->rowMarks here. If there
* are only non-locking rowmarks, they should be handled by the
* ModifyTable node instead.)
*/
if (parse->rowMarks) {
#ifdef ENABLE_MOT
if (!IsMOTEngineUsed()) {
#endif
*resultPlan = (Plan*)make_lockrows(root, *resultPlan);
#ifdef ENABLE_MOT
}
#endif
/*
* The result can no longer be assumed sorted, since redistribute add
* for lockrows may cause the data unsorted.
*/
*currentPathKeys = NIL;
if (ufdwCxt != NULL && ufdwCxt->state != FDW_UPPER_REL_END) {
AdvanceFDWUpperPlan(ufdwCxt, UPPERREL_ROWMARKS, *resultPlan);
}
}
}
/* --------------------
* grouping_planner
* Perform planning steps related to grouping, aggregation, etc.
@ -4613,78 +4693,14 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
AdvanceFDWUpperPlan(ufdwCxt, UPPERREL_WINDOW, result_plan);
}
}
/*
* If there is a FOR [KEY] UPDATE/SHARE clause, add the LockRows node. (Note: we
* intentionally test parse->rowMarks not root->rowMarks here. If there
* are only non-locking rowmarks, they should be handled by the
* ModifyTable node instead.)
*/
if (parse->rowMarks) {
#ifdef ENABLE_MOT
if (!IsMOTEngineUsed()) {
#endif
result_plan = (Plan*)make_lockrows(root, result_plan);
#ifdef ENABLE_MOT
}
#endif
/*
* The result can no longer be assumed sorted, since redistribute add
* for lockrows may cause the data unsorted.
*/
#ifndef PGXC
current_pathkeys = NIL;
#endif
if (ufdwCxt != NULL && ufdwCxt->state != FDW_UPPER_REL_END) {
AdvanceFDWUpperPlan(ufdwCxt, UPPERREL_ROWMARKS, result_plan);
}
}
/*
* If ORDER BY was given and we were not able to make the plan come out in
* the right order, add an explicit sort step.
*/
if (parse->sortClause) {
if (parse->is_flt_frame && parse->hasTargetSRFs) {
List* sort_input_target = build_plan_tlist(root, planner_targets->final_target);
result_plan->targetlist = sort_input_target;
}
/*
* Set group_set and again build pathkeys, data's value can be altered groupingSet after,
* so equal expr can not be deleted from pathkeys. Rebuild pathkey EquivalenceClass's ec_group_set
* is true.
*/
rebuild_pathkey_for_groupingSet<sort_pathkey>(root, tlist, NULL, collectiveGroupExpr);
/* we also need to add sort if the sub node is parallized. */
if (!pathkeys_contained_in(root->sort_pathkeys, current_pathkeys) ||
(result_plan->dop > 1 && root->sort_pathkeys)) {
result_plan = (Plan*)make_sort_from_pathkeys(root, result_plan, root->sort_pathkeys, limit_tuples);
#ifdef PGXC
#ifdef STREAMPLAN
if (IS_STREAM_PLAN && check_sort_for_upsert(root))
result_plan = make_stream_sort(root, result_plan);
#endif /* STREAMPLAN */
if (IS_PGXC_COORDINATOR && !IS_STREAM && !IsConnFromCoord())
result_plan = (Plan*)create_remotesort_plan(root, result_plan);
#endif /* PGXC */
current_pathkeys = root->sort_pathkeys;
}
if (parse->is_flt_frame && parse->hasTargetSRFs) {
result_plan = adjust_plan_for_srfs(root, result_plan,
planner_targets->final_targets,
planner_targets->final_targets_contain_srfs);
}
if (ufdwCxt != NULL && ufdwCxt->state != FDW_UPPER_REL_END) {
ufdwCxt->orderExtra = (OrderPathExtraData*)palloc(sizeof(OrderPathExtraData));
ufdwCxt->orderExtra->targetList = result_plan->targetlist;
AdvanceFDWUpperPlan(ufdwCxt, UPPERREL_ORDERED, result_plan);
}
if (IS_STREAM_PLAN) {
process_rowMarks(parse, &result_plan, root, &current_pathkeys, ufdwCxt);
process_sort(parse, root, planner_targets, &result_plan, tlist, collectiveGroupExpr,
&current_pathkeys, limit_tuples, ufdwCxt);
} else {
process_sort(parse, root, planner_targets, &result_plan, tlist, collectiveGroupExpr,
&current_pathkeys, limit_tuples, ufdwCxt);
process_rowMarks(parse, &result_plan, root, &current_pathkeys, ufdwCxt);
}
/*

View File

@ -5517,7 +5517,7 @@ static void check_global_variables()
}
}
#define BASE_PGXC_LIKE_MACRO_NUM 1397
#define BASE_PGXC_LIKE_MACRO_NUM 1396
static void check_pgxc_like_macros()
{
#ifdef BUILD_BY_CMAKE