diff --git a/src/common/backend/libpq/pqmq.cpp b/src/common/backend/libpq/pqmq.cpp index 94a304533..c447e6e52 100644 --- a/src/common/backend/libpq/pqmq.cpp +++ b/src/common/backend/libpq/pqmq.cpp @@ -215,7 +215,34 @@ void pq_parse_errornotice(StringInfo msg, ErrorData *edata) switch (code) { case PG_DIAG_SEVERITY: - /* ignore, trusting we'll get a nonlocalized version */ + if (strcmp(value, "DEBUG") == 0) { + /* + * We can't reconstruct the exact DEBUG level, but + * presumably it was >= client_min_messages, so select + * DEBUG1 to ensure we'll pass it on to the client. + */ + edata->elevel = DEBUG1; + } else if (strcmp(value, "LOG") == 0) { + /* + * It can't be LOG_SERVER_ONLY, or the worker wouldn't + * have sent it to us; so LOG is the correct value. + */ + edata->elevel = LOG; + } else if (strcmp(value, "INFO") == 0) { + edata->elevel = INFO; + } else if (strcmp(value, "NOTICE") == 0) { + edata->elevel = NOTICE; + } else if (strcmp(value, "WARNING") == 0) { + edata->elevel = WARNING; + } else if (strcmp(value, "ERROR") == 0) { + edata->elevel = ERROR; + } else if (strcmp(value, "FATAL") == 0) { + edata->elevel = FATAL; + } else if (strcmp(value, "PANIC") == 0) { + edata->elevel = PANIC; + } else { + elog(ERROR, "unrecognized error severity: \"%s\"", value); + } break; case PG_DIAG_INTERNEL_ERRCODE: /* ignore */ diff --git a/src/gausskernel/optimizer/path/costsize.cpp b/src/gausskernel/optimizer/path/costsize.cpp index ba3b12033..edba68e1a 100644 --- a/src/gausskernel/optimizer/path/costsize.cpp +++ b/src/gausskernel/optimizer/path/costsize.cpp @@ -2953,6 +2953,12 @@ void final_cost_nestloop(PlannerInfo* root, NestPath* path, JoinCostWorkspace* w /* Mark the path with the correct row estimate */ set_rel_path_rows(&path->path, path->path.parent, path->path.param_info); + /* For partial paths, scale row estimate. */ + if (path->path.parallel_workers > 0) { + double parallel_divisor = get_parallel_divisor(&path->path); + path->path.rows = clamp_row_est(path->path.rows / parallel_divisor); + } + /* * If inner_path or outer_path is EC functioinScan without stream, * we should set the multiple particularly. @@ -3319,6 +3325,12 @@ void final_cost_mergejoin( /* Mark the path with the correct row estimate */ set_rel_path_rows(&path->jpath.path, path->jpath.path.parent, path->jpath.path.param_info); + /* For partial paths, scale row estimate. */ + if (path->jpath.path.parallel_workers > 0) { + double parallel_divisor = get_parallel_divisor(&path->jpath.path); + path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor); + } + /* * If inner_path or outer_path is EC functioinScan without stream, * we should set the multiple particularly. @@ -3896,6 +3908,12 @@ void final_cost_hashjoin(PlannerInfo* root, HashPath* path, JoinCostWorkspace* w /* Mark the path with the correct row estimate */ set_rel_path_rows(&path->jpath.path, path->jpath.path.parent, path->jpath.path.param_info); + /* For partial paths, scale row estimate. */ + if (path->jpath.path.parallel_workers > 0) { + double parallel_divisor = get_parallel_divisor(&path->jpath.path); + path->jpath.path.rows = clamp_row_est(path->jpath.path.rows / parallel_divisor); + } + /* * If inner_path or outer_path is EC functioinScan without stream, * we should set the multiple particularly. diff --git a/src/test/regress/expected/parallel_hashjoin.out b/src/test/regress/expected/parallel_hashjoin.out index e5eeba810..622b90952 100644 --- a/src/test/regress/expected/parallel_hashjoin.out +++ b/src/test/regress/expected/parallel_hashjoin.out @@ -68,7 +68,7 @@ select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b -- Forbid parallel Hash Right Join or Hash Full Join. explain (costs off)select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10; - QUERY PLAN + QUERY PLAN -------------------------------------------------------------------------------------- Limit -> Sort @@ -96,6 +96,7 @@ select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on par (10 rows) -- parallel increase hash buckets +set enable_mergejoin=off; DROP TABLE IF EXISTS par_hash_incr_bucket_a; NOTICE: table "par_hash_incr_bucket_a" does not exist, skipping DROP TABLE IF EXISTS par_hash_incr_bucket_b; @@ -117,23 +118,23 @@ where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a and par_hash_incr_bucket_b.e %2 =0; - QUERY PLAN + QUERY PLAN -------------------------------------------------------------------------------------------- Aggregate -> Hash Join Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_d.b) - -> Gather - Number of Workers: 2 - -> Parallel Hash Join - Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a) + -> Hash Join + Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a) + -> Gather + Number of Workers: 2 -> Parallel Hash Join Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d) -> Parallel Seq Scan on par_hash_incr_bucket_a -> Parallel Hash -> Parallel Seq Scan on par_hash_incr_bucket_b Filter: ((e % 2) = 0) - -> Parallel Hash - -> Parallel Seq Scan on par_hash_incr_bucket_c + -> Hash + -> Seq Scan on par_hash_incr_bucket_c -> Hash -> Seq Scan on par_hash_incr_bucket_d (17 rows) @@ -143,7 +144,7 @@ where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a and par_hash_incr_bucket_b.e %2 =0; - count + count ------- 50000 (1 row) @@ -158,4 +159,5 @@ reset parallel_setup_cost; reset min_parallel_table_scan_size; reset parallel_tuple_cost; reset enable_nestloop; +reset enable_mergejoin; reset force_parallel_mode; diff --git a/src/test/regress/expected/parallel_query.out b/src/test/regress/expected/parallel_query.out index 4ad1ecd5a..99a424578 100644 --- a/src/test/regress/expected/parallel_query.out +++ b/src/test/regress/expected/parallel_query.out @@ -332,15 +332,15 @@ FROM t_result t; -> Parallel Index Scan using test_with_rescan_sj_dm_idx on test_with_rescan Index Cond: (sj_dm < 10) -> Merge Join - Merge Cond: (t1.dm = t2.sj_dm) - -> Sort - Sort Key: t1.dm - -> WorkTable Scan on t_result t1 + Merge Cond: (t2.sj_dm = t1.dm) -> Sort Sort Key: t2.sj_dm -> Gather Number of Workers: 1 -> Parallel Index Scan using test_with_rescan_sj_dm_idx on test_with_rescan t2 + -> Sort + Sort Key: t1.dm + -> WorkTable Scan on t_result t1 (20 rows) WITH recursive t_result AS ( @@ -1078,7 +1078,21 @@ select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in --------------------------------------------------------------------------------------------------- Merge Join Output: boo_1.a - Merge Cond: ((max(boo.a)) = (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)) + Merge Cond: ((CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) = (max(boo.a))) + -> Sort + Output: boo_1.a, (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) + Sort Key: (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) + -> Gather + Output: boo_1.a, CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo_1 + Output: boo_1.a + SubPlan 1 + -> Gather + Output: boo_3.a + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo_3 + Output: boo_3.a -> Sort Output: (max(boo.a)) Sort Key: (max(boo.a)) @@ -1096,20 +1110,6 @@ select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in Number of Workers: 2 -> Parallel Seq Scan on public.subplan_tb1 boo_2 Output: boo_2.a, boo_2.b - -> Sort - Output: boo_1.a, (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) - Sort Key: (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) - -> Gather - Output: boo_1.a, CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END - Number of Workers: 2 - -> Parallel Seq Scan on public.subplan_tb1 boo_1 - Output: boo_1.a - SubPlan 1 - -> Gather - Output: boo_3.a - Number of Workers: 2 - -> Parallel Seq Scan on public.subplan_tb1 boo_3 - Output: boo_3.a (34 rows) --clean up @@ -1163,6 +1163,45 @@ explain (costs off, analyse on) select schemaname, tablename from pg_tables wher --? Total runtime: [0-9]*\.[0-9]*\.\.[0-9]*\.[0-9]* ms (10 rows) +--worker send NOTICE message to leader +CREATE FUNCTION f_parallel_notice (text) + RETURNS bool LANGUAGE 'plpgsql' COST 0.0000001 + AS 'BEGIN RAISE NOTICE ''notice => %'', $1; RETURN true; END'; +CREATE TABLE notice_table(a int); +insert into notice_table values(generate_series(1,5)); +--let worker do the work +set parallel_leader_participation=off; +explain select * from notice_table where f_parallel_notice(a); + QUERY PLAN +------------------------------------------------------------------------------ + Gather (cost=0.00..20.02 rows=801 width=4) + Number of Workers: 2 + -> Parallel Seq Scan on notice_table (cost=0.00..20.01 rows=400 width=4) + Filter: f_parallel_notice((a)::text) +(4 rows) + +select * from notice_table where f_parallel_notice(a); +NOTICE: notice => 1 +CONTEXT: parallel worker +NOTICE: notice => 2 +CONTEXT: parallel worker +NOTICE: notice => 3 +CONTEXT: parallel worker +NOTICE: notice => 4 +CONTEXT: parallel worker +NOTICE: notice => 5 +CONTEXT: parallel worker + a +--- + 1 + 2 + 3 + 4 + 5 +(5 rows) + +drop table notice_table; +drop function f_parallel_notice; --clean up reset force_parallel_mode; reset parallel_setup_cost; diff --git a/src/test/regress/sql/parallel_hashjoin.sql b/src/test/regress/sql/parallel_hashjoin.sql index 64d25308d..8ada69303 100644 --- a/src/test/regress/sql/parallel_hashjoin.sql +++ b/src/test/regress/sql/parallel_hashjoin.sql @@ -21,6 +21,7 @@ explain (costs off)select * from parallel_hashjoin_test_a full join parallel_has select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10; -- parallel increase hash buckets +set enable_mergejoin=off; DROP TABLE IF EXISTS par_hash_incr_bucket_a; DROP TABLE IF EXISTS par_hash_incr_bucket_b; DROP TABLE IF EXISTS par_hash_incr_bucket_c; @@ -54,6 +55,7 @@ reset parallel_setup_cost; reset min_parallel_table_scan_size; reset parallel_tuple_cost; reset enable_nestloop; +reset enable_mergejoin; reset force_parallel_mode; diff --git a/src/test/regress/sql/parallel_query.sql b/src/test/regress/sql/parallel_query.sql index 30440ede4..382296f37 100644 --- a/src/test/regress/sql/parallel_query.sql +++ b/src/test/regress/sql/parallel_query.sql @@ -318,6 +318,20 @@ set min_parallel_table_scan_size=0; set parallel_leader_participation=on; -- nestloop explain (costs off, analyse on) select schemaname, tablename from pg_tables where tablename like 'sql%' order by tablename; + +--worker send NOTICE message to leader +CREATE FUNCTION f_parallel_notice (text) + RETURNS bool LANGUAGE 'plpgsql' COST 0.0000001 + AS 'BEGIN RAISE NOTICE ''notice => %'', $1; RETURN true; END'; +CREATE TABLE notice_table(a int); +insert into notice_table values(generate_series(1,5)); +--let worker do the work +set parallel_leader_participation=off; +explain select * from notice_table where f_parallel_notice(a); +select * from notice_table where f_parallel_notice(a); +drop table notice_table; +drop function f_parallel_notice; + --clean up reset force_parallel_mode; reset parallel_setup_cost;