Fix smp in merge sort.

This commit is contained in:
totaj
2022-10-18 16:08:36 +08:00
parent 62bbcac3a0
commit 9fddb8b232
4 changed files with 158 additions and 25 deletions

View File

@ -1093,7 +1093,7 @@ bool PreprocessOperator(Node* node, void* context)
errno_t errorno = EOK;
errorno = memcpy_s(node, sizeof(OpExpr), (Node*)newNode, sizeof(OpExpr));
securec_check_c(errorno, "\0", "\0");
securec_check(errorno, "\0", "\0");
}
pfree_ext(newNode);
@ -1124,21 +1124,21 @@ void check_is_support_recursive_cte(PlannerInfo* root)
/* 1. Installation nodegroup, compute nodegroup, single nodegroup. */
if (different_nodegroup_count > 2) {
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
return;
}
/* 2. Installation nodegroup, compute nodegroup. */
if (different_nodegroup_count == 2 && ng_get_single_node_distribution() == NULL) {
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
return;
}
/* 3. Installation nodegroup, single nodegroup which is used */
if (different_nodegroup_count == 2 && u_sess->opt_cxt.is_dngather_support == true) {
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
return;
}
@ -3313,7 +3313,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"set-valued function + groupingsets");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3321,7 +3321,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"var in quals doesn't exist in targetlist");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
}
@ -3425,7 +3425,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"set-valued expression in qual/targetlist + two-level Groupagg\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3573,7 +3573,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Count(Distinct) + Group by\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
need_sort_for_grouping = true;
@ -3608,7 +3608,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"multi count(distinct) or agg which need order can not ship.");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
} else if (agg_costs.exprAggs != NIL) {
@ -3625,7 +3625,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Count(Distinct)\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
} else if (result_plan->dop > 1) {
result_plan =
@ -3643,7 +3643,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Aggregate on polymorphic argument type \"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3653,7 +3653,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Subplan in having qual + two-level Groupagg\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3662,7 +3662,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"set-valued expression in qual/targetlist + two-level Groupagg\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3702,7 +3702,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"set-valued expression in qual/targetlist + two-level Groupagg\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -3773,7 +3773,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"set-valued expression in qual/targetlist + two-level Groupagg\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -4107,7 +4107,7 @@ static Plan* grouping_planner(PlannerInfo* root, double tuple_fraction)
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"set-valued expression in qual/targetlist + two-level distinct\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -4638,7 +4638,7 @@ static Plan* build_groupingsets_plan(PlannerInfo* root, Query* parse, List** tli
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Count(Distinct)\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
}
@ -4661,7 +4661,7 @@ static Plan* build_groupingsets_plan(PlannerInfo* root, Query* parse, List** tli
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"String_agg\" or \"Array_agg\" or \"Listagg\" + \"Grouping sets\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
}
@ -5452,7 +5452,7 @@ List* extract_rollup_sets(List* groupingSets)
adjacency[i] = (short*)palloc((n_adj + 1) * sizeof(short));
errno_t errorno =
memcpy_s(adjacency[i], (n_adj + 1) * sizeof(short), adjacency_buf, (n_adj + 1) * sizeof(short));
securec_check_c(errorno, "\0", "\0");
securec_check(errorno, "\0", "\0");
} else
adjacency[i] = NULL;
@ -6426,7 +6426,7 @@ static bool choose_hashed_grouping(PlannerInfo* root, double tuple_fraction, dou
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Count(Distinct)\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
}
@ -11151,12 +11151,12 @@ static Plan* generate_hashagg_plan(PlannerInfo* root, Plan* plan, List* final_li
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Subplan in having qual + Group by\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
} else {
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"String_agg/Array_agg/Listagg + Group by\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
}
mark_stream_unsupport();
*needs_stream = false;
@ -11730,7 +11730,7 @@ static Plan* get_count_distinct_partial_plan(PlannerInfo* root, Plan* result_pla
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Subplan in having qual + Count(distinct)\"");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
}
@ -11758,7 +11758,7 @@ static Plan* get_count_distinct_partial_plan(PlannerInfo* root, Plan* result_pla
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"\"Count(Distinct)\" on redistribution unsupported data type");
securec_check_ss_c(sprintf_rc, "\0", "\0");
securec_check_ss(sprintf_rc, "\0", "\0");
mark_stream_unsupport();
/* Set final_tlist to plan targetlist, this plan will be discarded. */
result_plan->targetlist = *final_tlist;

View File

@ -2144,6 +2144,7 @@ StreamState* ExecInitStream(Stream* node, EState* estate, int eflags)
state->tempTupleVec->tupleVector[j] = MakeTupleTableSlot(false);
ExecSetSlotDescriptor(state->tempTupleVec->tupleVector[j], typeInfo);
}
node->sort = NULL;
}
state->StreamDeserialize = GetTupleFromConnBuffer;

View File

@ -1204,11 +1204,92 @@ select * from (select a, rownum as row from (select a from t3) where rownum <= 1
10 | 10
(6 rows)
CREATE TABLE bmsql_item (
i_id int NoT NULL,
i_name varchar(24),
i_price numeric(5,2),
i_data varchar( 50),
i_im_id int
);
insert into bmsql_item values ('1','sqltest_varchar_1','0.01','sqltest_varchar_1','1') ;
insert into bmsql_item values ('2','sqltest_varchar_2','0.02','sqltest_varchar_2','2') ;
insert into bmsql_item values ('3','sqltest_varchar_3','0.03','sqltest_varchar_3','3') ;
insert into bmsql_item values ('4','sqltest_varchar_4','0.04','sqltest_varchar_4','4') ;
insert into bmsql_item(i_id) values ('5');
create table bmsql_warehouse(
w_id int not null,
w_ytd numeric(12,2),
w_tax numeric(4,4),
w_name varchar(10),
w_street_1 varchar(20),
w_street_2 varchar(20),
w_city varchar(20),
w_state char(2),
w_zip char(9)
);
insert into bmsql_warehouse values('1','0.01','0.0001','sqltest_va','sqltest_varchar_1','sqltest_varchar_1','sqltest_varchar_1','sq','sqltest_b');
insert into bmsql_warehouse values('2','0.02','0.0002','sqltest_va','sqltest_varchar_2','sqltest_varchar_2','sqltest_varchar_2','sq','sqltest_b');
insert into bmsql_warehouse values('3','0.03','0.0003','sqltest_va','sqltest_varchar_3','sqltest_varchar_3','sqltest_varchar_3','sq','sqltest_b');
insert into bmsql_warehouse values('4','0.04','0.0004','sqltest_va','sqltest_varchar_4','sqltest_varchar_4','sqltest_varchar_4','sq','sqltest_b');
insert into bmsql_warehouse(w_id) values('5');
set query_dop=4;
explain (costs off) select 0.01
from bmsql_item
intersect
select first_value(i_price) over (order by 2)
from bmsql_item
where i_id <=(select w_id from bmsql_warehouse
where bmsql_item.i_name not like 'sqltest_varchar_2' order by 1 limit 1)
group by i_price;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
HashSetOp Intersect
-> Append
-> Subquery Scan on "*SELECT* 1"
-> Seq Scan on bmsql_item
-> Subquery Scan on "*SELECT* 2"
-> WindowAgg
-> Streaming(type: BROADCAST dop: 1/4)
-> Group
Group By Key: test_smp.bmsql_item.i_price
-> Sort
Sort Key: test_smp.bmsql_item.i_price
-> Streaming(type: LOCAL REDISTRIBUTE dop: 4/1)
-> Group
Group By Key: test_smp.bmsql_item.i_price
-> Sort
Sort Key: test_smp.bmsql_item.i_price
-> Seq Scan on bmsql_item
Filter: (i_id <= (SubPlan 1))
SubPlan 1
-> Limit
-> Sort
Sort Key: bmsql_warehouse.w_id
-> Result
One-Time Filter: ((test_smp.bmsql_item.i_name)::text !~~ 'sqltest_varchar_2'::text)
-> Seq Scan on bmsql_warehouse
(25 rows)
select 0.01
from bmsql_item
intersect
select first_value(i_price) over (order by 2)
from bmsql_item
where i_id <=(select w_id from bmsql_warehouse
where bmsql_item.i_name not like 'sqltest_varchar_2' order by 1 limit 1)
group by i_price;
?column?
----------
.01
(1 row)
--clean
set search_path=public;
drop schema test_smp cascade;
NOTICE: drop cascades to 4 other objects
NOTICE: drop cascades to 6 other objects
DETAIL: drop cascades to table test_smp.t1
drop cascades to table test_smp.t2
drop cascades to table test_smp.t3
drop cascades to table test_smp.t4
drop cascades to table test_smp.bmsql_item
drop cascades to table test_smp.bmsql_warehouse

View File

@ -77,6 +77,57 @@ select (select max(id) from t4);
explain (costs off) select * from (select a, rownum as row from (select a from t3) where rownum <= 10) where row >=5;
select * from (select a, rownum as row from (select a from t3) where rownum <= 10) where row >=5;
CREATE TABLE bmsql_item (
i_id int NoT NULL,
i_name varchar(24),
i_price numeric(5,2),
i_data varchar( 50),
i_im_id int
);
insert into bmsql_item values ('1','sqltest_varchar_1','0.01','sqltest_varchar_1','1') ;
insert into bmsql_item values ('2','sqltest_varchar_2','0.02','sqltest_varchar_2','2') ;
insert into bmsql_item values ('3','sqltest_varchar_3','0.03','sqltest_varchar_3','3') ;
insert into bmsql_item values ('4','sqltest_varchar_4','0.04','sqltest_varchar_4','4') ;
insert into bmsql_item(i_id) values ('5');
create table bmsql_warehouse(
w_id int not null,
w_ytd numeric(12,2),
w_tax numeric(4,4),
w_name varchar(10),
w_street_1 varchar(20),
w_street_2 varchar(20),
w_city varchar(20),
w_state char(2),
w_zip char(9)
);
insert into bmsql_warehouse values('1','0.01','0.0001','sqltest_va','sqltest_varchar_1','sqltest_varchar_1','sqltest_varchar_1','sq','sqltest_b');
insert into bmsql_warehouse values('2','0.02','0.0002','sqltest_va','sqltest_varchar_2','sqltest_varchar_2','sqltest_varchar_2','sq','sqltest_b');
insert into bmsql_warehouse values('3','0.03','0.0003','sqltest_va','sqltest_varchar_3','sqltest_varchar_3','sqltest_varchar_3','sq','sqltest_b');
insert into bmsql_warehouse values('4','0.04','0.0004','sqltest_va','sqltest_varchar_4','sqltest_varchar_4','sqltest_varchar_4','sq','sqltest_b');
insert into bmsql_warehouse(w_id) values('5');
set query_dop=4;
explain (costs off) select 0.01
from bmsql_item
intersect
select first_value(i_price) over (order by 2)
from bmsql_item
where i_id <=(select w_id from bmsql_warehouse
where bmsql_item.i_name not like 'sqltest_varchar_2' order by 1 limit 1)
group by i_price;
select 0.01
from bmsql_item
intersect
select first_value(i_price) over (order by 2)
from bmsql_item
where i_id <=(select w_id from bmsql_warehouse
where bmsql_item.i_name not like 'sqltest_varchar_2' order by 1 limit 1)
group by i_price;
--clean
set search_path=public;
drop schema test_smp cascade;