!474 并行查询bug修复

Merge pull request !474 from TotaJ/feature/test
This commit is contained in:
opengauss-bot
2020-12-09 10:10:46 +08:00
committed by Gitee
6 changed files with 131 additions and 29 deletions

View File

@ -215,7 +215,34 @@ void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
switch (code) {
case PG_DIAG_SEVERITY:
/* ignore, trusting we'll get a nonlocalized version */
if (strcmp(value, "DEBUG") == 0) {
/*
* We can't reconstruct the exact DEBUG level, but
* presumably it was >= client_min_messages, so select
* DEBUG1 to ensure we'll pass it on to the client.
*/
edata->elevel = DEBUG1;
} else if (strcmp(value, "LOG") == 0) {
/*
* It can't be LOG_SERVER_ONLY, or the worker wouldn't
* have sent it to us; so LOG is the correct value.
*/
edata->elevel = LOG;
} else if (strcmp(value, "INFO") == 0) {
edata->elevel = INFO;
} else if (strcmp(value, "NOTICE") == 0) {
edata->elevel = NOTICE;
} else if (strcmp(value, "WARNING") == 0) {
edata->elevel = WARNING;
} else if (strcmp(value, "ERROR") == 0) {
edata->elevel = ERROR;
} else if (strcmp(value, "FATAL") == 0) {
edata->elevel = FATAL;
} else if (strcmp(value, "PANIC") == 0) {
edata->elevel = PANIC;
} else {
elog(ERROR, "unrecognized error severity: \"%s\"", value);
}
break;
case PG_DIAG_INTERNEL_ERRCODE:
/* ignore */

View File

@ -2953,6 +2953,12 @@ void final_cost_nestloop(PlannerInfo* root, NestPath* path, JoinCostWorkspace* w
/* Mark the path with the correct row estimate */
set_rel_path_rows(&path->path, path->path.parent, path->path.param_info);
/* For partial paths, scale row estimate. */
if (path->path.parallel_workers > 0) {
double parallel_divisor = get_parallel_divisor(&path->path);
path->path.rows = clamp_row_est(path->path.rows / parallel_divisor);
}
/*
* If inner_path or outer_path is EC functioinScan without stream,
* we should set the multiple particularly.
@ -3319,6 +3325,12 @@ void final_cost_mergejoin(
/* Mark the path with the correct row estimate */
set_rel_path_rows(&path->jpath.path, path->jpath.path.parent, path->jpath.path.param_info);
/* For partial paths, scale row estimate. */
if (path->jpath.path.parallel_workers > 0) {
double parallel_divisor = get_parallel_divisor(&path->jpath.path);
path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor);
}
/*
* If inner_path or outer_path is EC functioinScan without stream,
* we should set the multiple particularly.
@ -3896,6 +3908,12 @@ void final_cost_hashjoin(PlannerInfo* root, HashPath* path, JoinCostWorkspace* w
/* Mark the path with the correct row estimate */
set_rel_path_rows(&path->jpath.path, path->jpath.path.parent, path->jpath.path.param_info);
/* For partial paths, scale row estimate. */
if (path->jpath.path.parallel_workers > 0) {
double parallel_divisor = get_parallel_divisor(&path->jpath.path);
path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor);
}
/*
* If inner_path or outer_path is EC functioinScan without stream,
* we should set the multiple particularly.

View File

@ -68,7 +68,7 @@ select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b
-- Forbid parallel Hash Right Join or Hash Full Join.
explain (costs off)select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10;
QUERY PLAN
QUERY PLAN
--------------------------------------------------------------------------------------
Limit
-> Sort
@ -96,6 +96,7 @@ select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on par
(10 rows)
-- parallel increase hash buckets
set enable_mergejoin=off;
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
NOTICE: table "par_hash_incr_bucket_a" does not exist, skipping
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
@ -117,23 +118,23 @@ where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
QUERY PLAN
QUERY PLAN
--------------------------------------------------------------------------------------------
Aggregate
-> Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_d.b)
-> Gather
Number of Workers: 2
-> Parallel Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a)
-> Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a)
-> Gather
Number of Workers: 2
-> Parallel Hash Join
Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d)
-> Parallel Seq Scan on par_hash_incr_bucket_a
-> Parallel Hash
-> Parallel Seq Scan on par_hash_incr_bucket_b
Filter: ((e % 2) = 0)
-> Parallel Hash
-> Parallel Seq Scan on par_hash_incr_bucket_c
-> Hash
-> Seq Scan on par_hash_incr_bucket_c
-> Hash
-> Seq Scan on par_hash_incr_bucket_d
(17 rows)
@ -143,7 +144,7 @@ where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b
and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d
and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a
and par_hash_incr_bucket_b.e %2 =0;
count
count
-------
50000
(1 row)
@ -158,4 +159,5 @@ reset parallel_setup_cost;
reset min_parallel_table_scan_size;
reset parallel_tuple_cost;
reset enable_nestloop;
reset enable_mergejoin;
reset force_parallel_mode;

View File

@ -332,15 +332,15 @@ FROM t_result t;
-> Parallel Index Scan using test_with_rescan_sj_dm_idx on test_with_rescan
Index Cond: (sj_dm < 10)
-> Merge Join
Merge Cond: (t1.dm = t2.sj_dm)
-> Sort
Sort Key: t1.dm
-> WorkTable Scan on t_result t1
Merge Cond: (t2.sj_dm = t1.dm)
-> Sort
Sort Key: t2.sj_dm
-> Gather
Number of Workers: 1
-> Parallel Index Scan using test_with_rescan_sj_dm_idx on test_with_rescan t2
-> Sort
Sort Key: t1.dm
-> WorkTable Scan on t_result t1
(20 rows)
WITH recursive t_result AS (
@ -1078,7 +1078,21 @@ select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in
---------------------------------------------------------------------------------------------------
Merge Join
Output: boo_1.a
Merge Cond: ((max(boo.a)) = (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END))
Merge Cond: ((CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) = (max(boo.a)))
-> Sort
Output: boo_1.a, (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)
Sort Key: (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)
-> Gather
Output: boo_1.a, CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END
Number of Workers: 2
-> Parallel Seq Scan on public.subplan_tb1 boo_1
Output: boo_1.a
SubPlan 1
-> Gather
Output: boo_3.a
Number of Workers: 2
-> Parallel Seq Scan on public.subplan_tb1 boo_3
Output: boo_3.a
-> Sort
Output: (max(boo.a))
Sort Key: (max(boo.a))
@ -1096,20 +1110,6 @@ select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in
Number of Workers: 2
-> Parallel Seq Scan on public.subplan_tb1 boo_2
Output: boo_2.a, boo_2.b
-> Sort
Output: boo_1.a, (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)
Sort Key: (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)
-> Gather
Output: boo_1.a, CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END
Number of Workers: 2
-> Parallel Seq Scan on public.subplan_tb1 boo_1
Output: boo_1.a
SubPlan 1
-> Gather
Output: boo_3.a
Number of Workers: 2
-> Parallel Seq Scan on public.subplan_tb1 boo_3
Output: boo_3.a
(34 rows)
--clean up
@ -1163,6 +1163,45 @@ explain (costs off, analyse on) select schemaname, tablename from pg_tables wher
--? Total runtime: [0-9]*\.[0-9]*\.\.[0-9]*\.[0-9]* ms
(10 rows)
--worker send NOTICE message to leader
CREATE FUNCTION f_parallel_notice (text)
RETURNS bool LANGUAGE 'plpgsql' COST 0.0000001
AS 'BEGIN RAISE NOTICE ''notice => %'', $1; RETURN true; END';
CREATE TABLE notice_table(a int);
insert into notice_table values(generate_series(1,5));
--let worker do the work
set parallel_leader_participation=off;
explain select * from notice_table where f_parallel_notice(a);
QUERY PLAN
------------------------------------------------------------------------------
Gather (cost=0.00..20.02 rows=801 width=4)
Number of Workers: 2
-> Parallel Seq Scan on notice_table (cost=0.00..20.01 rows=400 width=4)
Filter: f_parallel_notice((a)::text)
(4 rows)
select * from notice_table where f_parallel_notice(a);
NOTICE: notice => 1
CONTEXT: parallel worker
NOTICE: notice => 2
CONTEXT: parallel worker
NOTICE: notice => 3
CONTEXT: parallel worker
NOTICE: notice => 4
CONTEXT: parallel worker
NOTICE: notice => 5
CONTEXT: parallel worker
a
---
1
2
3
4
5
(5 rows)
drop table notice_table;
drop function f_parallel_notice;
--clean up
reset force_parallel_mode;
reset parallel_setup_cost;

View File

@ -21,6 +21,7 @@ explain (costs off)select * from parallel_hashjoin_test_a full join parallel_has
select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10;
-- parallel increase hash buckets
set enable_mergejoin=off;
DROP TABLE IF EXISTS par_hash_incr_bucket_a;
DROP TABLE IF EXISTS par_hash_incr_bucket_b;
DROP TABLE IF EXISTS par_hash_incr_bucket_c;
@ -54,6 +55,7 @@ reset parallel_setup_cost;
reset min_parallel_table_scan_size;
reset parallel_tuple_cost;
reset enable_nestloop;
reset enable_mergejoin;
reset force_parallel_mode;

View File

@ -318,6 +318,20 @@ set min_parallel_table_scan_size=0;
set parallel_leader_participation=on;
-- nestloop
explain (costs off, analyse on) select schemaname, tablename from pg_tables where tablename like 'sql%' order by tablename;
--worker send NOTICE message to leader
CREATE FUNCTION f_parallel_notice (text)
RETURNS bool LANGUAGE 'plpgsql' COST 0.0000001
AS 'BEGIN RAISE NOTICE ''notice => %'', $1; RETURN true; END';
CREATE TABLE notice_table(a int);
insert into notice_table values(generate_series(1,5));
--let worker do the work
set parallel_leader_participation=off;
explain select * from notice_table where f_parallel_notice(a);
select * from notice_table where f_parallel_notice(a);
drop table notice_table;
drop function f_parallel_notice;
--clean up
reset force_parallel_mode;
reset parallel_setup_cost;