From 584d420639f0fa594fd4a34d2dbbb38120378360 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Mon, 15 Jul 2024 15:14:46 +0800 Subject: [PATCH] add testcase smp_cursor&¶llel_enable_function --- .../expected/parallel_enable_function.out | 670 ++++++++++++++++++ src/test/regress/expected/smp_cursor.out | 523 ++++++++++++++ src/test/regress/input/gs_dump_package.source | 42 ++ .../regress/output/gs_dump_package.source | 69 ++ src/test/regress/parallel_schedule0 | 2 +- src/test/regress/parallel_schedule0A | 2 +- .../regress/sql/parallel_enable_function.sql | 362 ++++++++++ src/test/regress/sql/smp_cursor.sql | 82 +++ 8 files changed, 1750 insertions(+), 2 deletions(-) create mode 100644 src/test/regress/expected/parallel_enable_function.out create mode 100644 src/test/regress/expected/smp_cursor.out create mode 100644 src/test/regress/sql/parallel_enable_function.sql create mode 100644 src/test/regress/sql/smp_cursor.sql diff --git a/src/test/regress/expected/parallel_enable_function.out b/src/test/regress/expected/parallel_enable_function.out new file mode 100644 index 000000000..36f755b03 --- /dev/null +++ b/src/test/regress/expected/parallel_enable_function.out @@ -0,0 +1,670 @@ +create schema parallel_enable_function; +set search_path=parallel_enable_function; +create table employees (employee_id number(6), department_id NUMBER, first_name varchar2(30), last_name varchar2(30), email varchar2(30), phone_number varchar2(30)); +BEGIN + FOR i IN 1..100 LOOP + INSERT INTO employees VALUES (i, 60, 'abc', 'def', '123', '123'); + END LOOP; + COMMIT; +END; +/ +CREATE TYPE my_outrec_typ AS ( + employee_id numeric(6,0), + department_id numeric, + first_name character varying(30), + last_name character varying(30), + email character varying(30), + phone_number character varying(30) +); +-- create srf function with parallel_enable +CREATE OR REPLACE FUNCTION hash_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END hash_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +-- create function with multi-partkey +CREATE OR REPLACE FUNCTION multi_partkey_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id, department_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END multi_partkey_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +-- create pipelined function +create type table_my_outrec_typ is table of my_outrec_typ; +CREATE OR REPLACE FUNCTION pipelined_table_f (p SYS_REFCURSOR) RETURN table_my_outrec_typ pipelined parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + pipe row(out_rec); + END LOOP; +END pipelined_table_f; +/ +NOTICE: immutable would be set if parallel_enable specified +CREATE OR REPLACE FUNCTION pipelined_array_f (p SYS_REFCURSOR) RETURN _employees PIPELINED parallel_enable (partition p by any) + IS + in_rec my_outrec_typ; + BEGIN +LOOP + FETCH p INTO in_rec; + EXIT WHEN p%NOTFOUND; + PIPE ROW (in_rec); + END LOOP; +END pipelined_array_f; +/ +NOTICE: immutable would be set if parallel_enable specified +-- without partition by +CREATE OR REPLACE FUNCTION no_partition_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END no_partition_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +-- call function +set query_dop = 1002; +explain (costs off) select * from hash_srf(cursor (select * from employees)) limit 10; + QUERY PLAN +---------------------------------------------- + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on hash_srf +(4 rows) + +select * from hash_srf(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +explain (costs off) select * from any_srf(cursor (select * from employees)) limit 10; + QUERY PLAN +---------------------------------------------- + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on any_srf +(4 rows) + +select * from any_srf(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +explain (costs off) select * from pipelined_table_f(cursor (select * from employees)) limit 10; + QUERY PLAN +------------------------------------------------------ + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on pipelined_table_f +(4 rows) + +select * from pipelined_table_f(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +explain (costs off) select * from multi_partkey_srf(cursor (select * from employees)) limit 10; + QUERY PLAN +------------------------------------------------------ + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on multi_partkey_srf +(4 rows) + +select * from multi_partkey_srf(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +explain (costs off) select * from pipelined_array_f(cursor (select * from employees)) limit 10; + QUERY PLAN +------------------------------------------------------ + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on pipelined_array_f +(4 rows) + +select * from pipelined_array_f(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +explain (costs off) select * from no_partition_srf(cursor (select * from employees)) limit 10; + QUERY PLAN +----------------------------------------- + Limit + -> Function Scan on no_partition_srf +(2 rows) + +select * from no_partition_srf(cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- + 1 | 60 | abc | def | 123 | 123 + 2 | 60 | abc | def | 123 | 123 + 3 | 60 | abc | def | 123 | 123 + 4 | 60 | abc | def | 123 | 123 + 5 | 60 | abc | def | 123 | 123 + 6 | 60 | abc | def | 123 | 123 + 7 | 60 | abc | def | 123 | 123 + 8 | 60 | abc | def | 123 | 123 + 9 | 60 | abc | def | 123 | 123 + 10 | 60 | abc | def | 123 | 123 +(10 rows) + +-- test count(*) +explain (costs off) select count(*) from hash_srf(cursor (select * from employees)); + QUERY PLAN +---------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Function Scan on hash_srf +(4 rows) + +select count(*) from hash_srf(cursor (select * from employees)); + count +------- + 100 +(1 row) + +-- test multi cursor args +CREATE OR REPLACE FUNCTION multi_cursor_srf (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(employee_id)) IS + out_rec_1 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + out_rec_2 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p1 INTO out_rec_1.employee_id, out_rec_1.department_id, out_rec_1.first_name, out_rec_1.last_name, out_rec_1.email, out_rec_1.phone_number; -- input row + EXIT WHEN p1%NOTFOUND; + FETCH p2 INTO out_rec_2.employee_id, out_rec_2.department_id, out_rec_2.first_name, out_rec_2.last_name, out_rec_2.email, out_rec_2.phone_number; -- input row + EXIT WHEN p2%NOTFOUND; + return next out_rec_1; + END LOOP; + RETURN; +END multi_cursor_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +explain (costs off) select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10; + QUERY PLAN +----------------------------------------------------- + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on multi_cursor_srf +(4 rows) + +select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +(10 rows) + +explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); + QUERY PLAN +----------------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Function Scan on multi_cursor_srf +(4 rows) + +select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); + count +------- + 100 +(1 row) + +-- nested function call +explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; + QUERY PLAN +---------------------------------------------- + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Function Scan on hash_srf +(4 rows) + +select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; + employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +-- functionscan join +explain (costs off) select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10; + QUERY PLAN +------------------------------------------------------------- + Limit + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Limit + -> Nested Loop + -> Streaming(type: BROADCAST dop: 2/2) + -> Function Scan on hash_srf a + -> Function Scan on hash_srf b +(7 rows) + +select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10; + employee_id | department_id | first_name | last_name | email | phone_number | employee_id | department_id | first_name | last_name | email | phone_number +-------------+---------------+------------+-----------+-------+--------------+-------------+---------------+------------+-----------+-------+-------------- +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +--?* +(10 rows) + +-- targetlist +explain (costs off) select hash_srf(cursor (select * from employees)) limit 10; + QUERY PLAN +-------------- + Limit + -> Result +(2 rows) + +select hash_srf(cursor (select * from employees)) limit 10; + hash_srf +------------------------- + (1,60,abc,def,123,123) + (2,60,abc,def,123,123) + (3,60,abc,def,123,123) + (4,60,abc,def,123,123) + (5,60,abc,def,123,123) + (6,60,abc,def,123,123) + (7,60,abc,def,123,123) + (8,60,abc,def,123,123) + (9,60,abc,def,123,123) + (10,60,abc,def,123,123) +(10 rows) + +-- subquery cannot smp +explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees))); + QUERY PLAN +----------------------------------------- + Result + InitPlan 1 (returns $0) + -> Aggregate + -> Function Scan on hash_srf +(4 rows) + +select 1, (select count(*) from hash_srf(cursor (select * from employees))); + ?column? | count +----------+------- + 1 | 100 +(1 row) + +-- test create or replace +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc; + parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey +---------------------+--------------------------+------------------------- +(0 rows) + +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ +NOTICE: immutable would be set if parallel_enable specified +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc; + parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey +---------------------+--------------------------+------------------------- + 0 | 0 | {} +(1 row) + +-- set provolatile. stable/volatile with parallel_enable would throw error +CREATE OR REPLACE FUNCTION stable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ stable parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END stable_f; +/ +ERROR: only immutable can be set if parallel_enable specified +CREATE OR REPLACE FUNCTION volatile_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ volatile parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END volatile_f; +/ +ERROR: only immutable can be set if parallel_enable specified +CREATE OR REPLACE FUNCTION immutable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ immutable parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END immutable_f; +/ +-- Alter Function set volatile/stable would clear parallel_cursor info +alter function immutable_f(p SYS_REFCURSOR) volatile; +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'immutable_f'::regproc; + parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey +---------------------+--------------------------+------------------------- +(0 rows) + +alter function immutable_f(p SYS_REFCURSOR) stable; +alter function immutable_f(p SYS_REFCURSOR) immutable; +-- throw error when the operation of parallel cursor is not FETCH CURSOR +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH absolute 5 from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ +NOTICE: immutable would be set if parallel_enable specified +ERROR: only support FETCH CURSOR for parallel cursor "p" +CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4 +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH backward from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ +NOTICE: immutable would be set if parallel_enable specified +ERROR: only support FETCH CURSOR for parallel cursor "p" +CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4 +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH prior from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ +NOTICE: immutable would be set if parallel_enable specified +ERROR: only support FETCH CURSOR for parallel cursor "p" +CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4 +-- test specified non refcursor type +CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_type_f; +/ +NOTICE: immutable would be set if parallel_enable specified +ERROR: partition expr must be cursor-type parameter +CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR, a int) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_type_f; +/ +NOTICE: immutable would be set if parallel_enable specified +ERROR: partition expr must be cursor-type parameter +-- create non-SRF/pipelined function +CREATE OR REPLACE FUNCTION return_int_f (p SYS_REFCURSOR) RETURN int parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + res int := 0; +BEGIN + LOOP + FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + res := res + 1; + END LOOP; + RETURN res; +END return_int_f; +/ +NOTICE: immutable would be set if parallel_enable specified +explain (costs off) select * from return_int_f(cursor (select * from employees)); + QUERY PLAN +------------------------------- + Function Scan on return_int_f +(1 row) + +select * from return_int_f(cursor (select * from employees)); + return_int_f +-------------- + 100 +(1 row) + +-- declare cursor +begin; +declare xc no scroll cursor for select * from employees; +explain select * from hash_srf('xc'); + QUERY PLAN +------------------------------------------------------------------- + Function Scan on hash_srf (cost=0.25..10.25 rows=1000 width=358) +(1 row) + +end; +-- test bulk collect +CREATE OR REPLACE FUNCTION bulk_collect_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + emp_tab table_my_outrec_typ; +BEGIN + LOOP + FETCH p bulk collect INTO emp_tab limit 5; -- input row + EXIT WHEN p%NOTFOUND; + out_rec := emp_tab(emp_tab.first); + return next out_rec; + END LOOP; + RETURN; +END bulk_collect_f; +/ +NOTICE: immutable would be set if parallel_enable specified +explain (costs off) select count(*) from bulk_collect_f(cursor (select * from employees)); + QUERY PLAN +--------------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Function Scan on bulk_collect_f +(4 rows) + +select count(*) from bulk_collect_f(cursor (select * from employees)); + count +------- + 20 +(1 row) + +-- create package +create or replace package my_pkg as + FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any); +end my_pkg; +/ +NOTICE: immutable would be set if parallel_enable specified +create or replace package body my_pkg as + FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f; +end my_pkg; +/ +NOTICE: immutable would be set if parallel_enable specified +NOTICE: immutable would be set if parallel_enable specified +explain (costs off) select count(*) from my_pkg.pkg_f(cursor (select * from employees)); + QUERY PLAN +---------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Function Scan on pkg_f +(4 rows) + +select count(*) from my_pkg.pkg_f(cursor (select * from employees)); + count +------- + 100 +(1 row) + +drop schema parallel_enable_function cascade; +NOTICE: drop cascades to 15 other objects +DETAIL: drop cascades to table employees +drop cascades to type my_outrec_typ +drop cascades to function hash_srf(refcursor) +drop cascades to function multi_partkey_srf(refcursor) +drop cascades to type table_my_outrec_typ +drop cascades to function pipelined_table_f(refcursor) +drop cascades to function pipelined_array_f(refcursor) +drop cascades to function no_partition_srf(refcursor) +drop cascades to function multi_cursor_srf(refcursor,refcursor) +drop cascades to function any_srf(refcursor) +drop cascades to function immutable_f(refcursor) +drop cascades to function return_int_f(refcursor) +drop cascades to function bulk_collect_f(refcursor) +--?drop cascades to package.* +drop cascades to function parallel_enable_function.pkg_f(refcursor) diff --git a/src/test/regress/expected/smp_cursor.out b/src/test/regress/expected/smp_cursor.out new file mode 100644 index 000000000..96b492e4c --- /dev/null +++ b/src/test/regress/expected/smp_cursor.out @@ -0,0 +1,523 @@ +create schema smp_cursor; +set search_path=smp_cursor; +create table t1(a int, b int, c int, d bigint); +insert into t1 values(generate_series(1, 100), generate_series(1, 10), generate_series(1, 2), generate_series(1, 50)); +analyze t1; +set query_dop=1002; +explain (costs off) select * from t1; + QUERY PLAN +---------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on t1 +(2 rows) + +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test cursor smp +begin; +declare xc no scroll cursor for select * from t1; +fetch xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration:.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +end; +-- test plan hint +begin; +declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1; +fetch xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1; +Name: datanode1 +--?Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +end; +set query_dop = 1; +begin; +declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1; +fetch xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration:.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +end; +-- scroll cursor can not smp +set query_dop = 1002; +begin; +declare xc cursor for select /*+ set(query_dop 1002) */ * from t1; +fetch xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc cursor for select /*+ set(query_dop 1002) */ * from t1; +Name: datanode1 +--?Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +---+---+---+--- + 1 | 1 | 1 | 1 +(1 row) + +end; +-- cursor declared with plpgsql can not smp +declare + cursor xc no scroll is select * from t1; + tmp t1%ROWTYPE; +begin + open xc; + fetch xc into tmp; + close xc; +end; +/ +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: select * from t1 +Name: datanode1 +--?Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +CONTEXT: PL/pgSQL function inline_code_block line 5 at FETCH +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +CONTEXT: PL/pgSQL function inline_code_block line 5 at FETCH +-- test resource conflict checking +begin; +declare xc no scroll cursor for select * from t1; +drop table t1; +ERROR: cannot DROP TABLE "t1" because it is being used by active queries in this session +end; +-- test cursor with hold +begin; +declare xc no scroll cursor with hold for select * from t1; +fetch xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor with hold for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration:.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +end; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor with hold for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration:.* + +fetch absolute 10 xc; + a | b | c | d +----+----+---+---- +--?.* +(1 row) + +close xc; +-- test cursor backward error +begin; +declare xc no scroll cursor for select * from t1; +fetch absolute 10 xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +----+----+---+---- +--?.* +(1 row) + +fetch absolute 9 xc; +ERROR: cursor with stream plan do not support scan backward. +end; +-- test cursor other operate +begin; +declare xc no scroll cursor for select * from t1; +fetch first xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +fetch forward xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +fetch absolute 5 xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +---+---+---+--- +--?.* +(1 row) + +fetch relative 5 xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +----+----+---+---- +--?.* +(1 row) + +fetch all xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + + a | b | c | d +-----+----+---+---- +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +--?.* +(90 rows) + +move xc; +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: declare xc no scroll cursor for select * from t1; +Name: datanode1 +--?Streaming(type: LOCAL GATHER dop: 1/2).* + Output: a, b, c, d + Spawn on: All datanodes + Consumer Nodes: All datanodes +--? -> Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +end; +drop schema smp_cursor cascade; +NOTICE: drop cascades to table t1 diff --git a/src/test/regress/input/gs_dump_package.source b/src/test/regress/input/gs_dump_package.source index ec7da74b5..76f55e960 100644 --- a/src/test/regress/input/gs_dump_package.source +++ b/src/test/regress/input/gs_dump_package.source @@ -105,12 +105,54 @@ va pck9.r1; end pck6; / +-- test parallel_enable +CREATE TYPE my_outrec_typ AS ( + employee_id numeric(6,0), + department_id numeric, + first_name character varying(30), + last_name character varying(30), + email character varying(30), + phone_number character varying(30) +); +create or replace package my_pkg as + FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any); + FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)); +end my_pkg; +/ + +create or replace package body my_pkg as + FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f_1; + FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p1%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f_2; +end my_pkg; +/ + \! @abs_bindir@/gs_dump dump_package_db -p @portstring@ -f @abs_bindir@/dump_package.tar -F t >/dev/null 2>&1; echo $? \! @abs_bindir@/gs_restore -d restore_package_db -p @portstring@ @abs_bindir@/dump_package.tar >/dev/null 2>&1; echo $? \c restore_package_db call rowtype_pckg.rowtype_func(); +\sf my_pkg.pkg_f_1 +\sf my_pkg.pkg_f_2 + \c regression drop database if exists restore_subpartition_db; drop database if exists dump_subpartition_db; diff --git a/src/test/regress/output/gs_dump_package.source b/src/test/regress/output/gs_dump_package.source index 6dcaa1213..e18fab6ec 100644 --- a/src/test/regress/output/gs_dump_package.source +++ b/src/test/regress/output/gs_dump_package.source @@ -100,6 +100,49 @@ create or replace package pck6 is va pck9.r1; end pck6; / +-- test parallel_enable +CREATE TYPE my_outrec_typ AS ( + employee_id numeric(6,0), + department_id numeric, + first_name character varying(30), + last_name character varying(30), + email character varying(30), + phone_number character varying(30) +); +create or replace package my_pkg as + FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any); + FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)); +end my_pkg; +/ +NOTICE: immutable would be set if parallel_enable specified +NOTICE: immutable would be set if parallel_enable specified +create or replace package body my_pkg as + FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f_1; + FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p1%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f_2; +end my_pkg; +/ +NOTICE: immutable would be set if parallel_enable specified +NOTICE: immutable would be set if parallel_enable specified +NOTICE: immutable would be set if parallel_enable specified +NOTICE: immutable would be set if parallel_enable specified \! @abs_bindir@/gs_dump dump_package_db -p @portstring@ -f @abs_bindir@/dump_package.tar -F t >/dev/null 2>&1; echo $? 0 \! @abs_bindir@/gs_restore -d restore_package_db -p @portstring@ @abs_bindir@/dump_package.tar >/dev/null 2>&1; echo $? @@ -112,6 +155,32 @@ call rowtype_pckg.rowtype_func(); 2 | b (2 rows) +\sf my_pkg.pkg_f_1 +CREATE OR REPLACE FUNCTION public.my_pkg.pkg_f_1(p SYS_REFCURSOR) + RETURN SETOF my_outrec_typ IMMUTABLE NOT FENCED NOT SHIPPABLE PARALLEL_ENABLE (PARTITION p BY ANY) PACKAGE +AS DECLARE out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END ; +/ +\sf my_pkg.pkg_f_2 +CREATE OR REPLACE FUNCTION public.my_pkg.pkg_f_2(p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) + RETURN SETOF my_outrec_typ IMMUTABLE NOT FENCED NOT SHIPPABLE PARALLEL_ENABLE (PARTITION p1 BY HASH(a,b)) PACKAGE +AS DECLARE out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p1%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END ; +/ \c regression drop database if exists restore_subpartition_db; NOTICE: database "restore_subpartition_db" does not exist, skipping diff --git a/src/test/regress/parallel_schedule0 b/src/test/regress/parallel_schedule0 index 7684bd914..133fa781c 100644 --- a/src/test/regress/parallel_schedule0 +++ b/src/test/regress/parallel_schedule0 @@ -42,7 +42,7 @@ test: extract_pushdown_or_clause test: workload_manager test: spm_adaptive_gplan -test: smp +test: smp smp_cursor parallel_enable_function test: alter_hw_package test: hw_grant_package gsc_func gsc_db test: uppercase_attribute_name decode_compatible_with_o outerjoin_bugfix chr_gbk diff --git a/src/test/regress/parallel_schedule0A b/src/test/regress/parallel_schedule0A index 38984f378..1f9aadadf 100644 --- a/src/test/regress/parallel_schedule0A +++ b/src/test/regress/parallel_schedule0A @@ -33,7 +33,7 @@ test: extract_pushdown_or_clause test: workload_manager test: spm_adaptive_gplan -test: smp +test: smp smp_cursor parallel_enable_function test: alter_hw_package test: hw_grant_package gsc_func gsc_db test: uppercase_attribute_name decode_compatible_with_o outerjoin_bugfix diff --git a/src/test/regress/sql/parallel_enable_function.sql b/src/test/regress/sql/parallel_enable_function.sql new file mode 100644 index 000000000..21ceed582 --- /dev/null +++ b/src/test/regress/sql/parallel_enable_function.sql @@ -0,0 +1,362 @@ +create schema parallel_enable_function; +set search_path=parallel_enable_function; + +create table employees (employee_id number(6), department_id NUMBER, first_name varchar2(30), last_name varchar2(30), email varchar2(30), phone_number varchar2(30)); + +BEGIN + FOR i IN 1..100 LOOP + INSERT INTO employees VALUES (i, 60, 'abc', 'def', '123', '123'); + END LOOP; + COMMIT; +END; +/ + +CREATE TYPE my_outrec_typ AS ( + employee_id numeric(6,0), + department_id numeric, + first_name character varying(30), + last_name character varying(30), + email character varying(30), + phone_number character varying(30) +); + + +-- create srf function with parallel_enable +CREATE OR REPLACE FUNCTION hash_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END hash_srf; +/ + +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ + +-- create function with multi-partkey +CREATE OR REPLACE FUNCTION multi_partkey_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id, department_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END multi_partkey_srf; +/ + +-- create pipelined function +create type table_my_outrec_typ is table of my_outrec_typ; + +CREATE OR REPLACE FUNCTION pipelined_table_f (p SYS_REFCURSOR) RETURN table_my_outrec_typ pipelined parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + pipe row(out_rec); + END LOOP; +END pipelined_table_f; +/ + +CREATE OR REPLACE FUNCTION pipelined_array_f (p SYS_REFCURSOR) RETURN _employees PIPELINED parallel_enable (partition p by any) + IS + in_rec my_outrec_typ; + BEGIN +LOOP + FETCH p INTO in_rec; + EXIT WHEN p%NOTFOUND; + PIPE ROW (in_rec); + END LOOP; +END pipelined_array_f; +/ + +-- without partition by +CREATE OR REPLACE FUNCTION no_partition_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END no_partition_srf; +/ + +-- call function +set query_dop = 1002; +explain (costs off) select * from hash_srf(cursor (select * from employees)) limit 10; +select * from hash_srf(cursor (select * from employees)) limit 10; + +explain (costs off) select * from any_srf(cursor (select * from employees)) limit 10; +select * from any_srf(cursor (select * from employees)) limit 10; + +explain (costs off) select * from pipelined_table_f(cursor (select * from employees)) limit 10; +select * from pipelined_table_f(cursor (select * from employees)) limit 10; + +explain (costs off) select * from multi_partkey_srf(cursor (select * from employees)) limit 10; +select * from multi_partkey_srf(cursor (select * from employees)) limit 10; + +explain (costs off) select * from pipelined_array_f(cursor (select * from employees)) limit 10; +select * from pipelined_array_f(cursor (select * from employees)) limit 10; + +explain (costs off) select * from no_partition_srf(cursor (select * from employees)) limit 10; +select * from no_partition_srf(cursor (select * from employees)) limit 10; + +-- test count(*) +explain (costs off) select count(*) from hash_srf(cursor (select * from employees)); +select count(*) from hash_srf(cursor (select * from employees)); + +-- test multi cursor args +CREATE OR REPLACE FUNCTION multi_cursor_srf (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(employee_id)) IS + out_rec_1 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + out_rec_2 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p1 INTO out_rec_1.employee_id, out_rec_1.department_id, out_rec_1.first_name, out_rec_1.last_name, out_rec_1.email, out_rec_1.phone_number; -- input row + EXIT WHEN p1%NOTFOUND; + FETCH p2 INTO out_rec_2.employee_id, out_rec_2.department_id, out_rec_2.first_name, out_rec_2.last_name, out_rec_2.email, out_rec_2.phone_number; -- input row + EXIT WHEN p2%NOTFOUND; + return next out_rec_1; + END LOOP; + RETURN; +END multi_cursor_srf; +/ + +explain (costs off) select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10; +select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10; + +explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); +select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); + +-- nested function call +explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; +select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; + +-- functionscan join +explain (costs off) select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10; +select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10; + +-- targetlist +explain (costs off) select hash_srf(cursor (select * from employees)) limit 10; +select hash_srf(cursor (select * from employees)) limit 10; + +-- subquery cannot smp +explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees))); +select 1, (select count(*) from hash_srf(cursor (select * from employees))); + +-- test create or replace +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ + +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc; + +CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END any_srf; +/ + +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc; + +-- set provolatile. stable/volatile with parallel_enable would throw error +CREATE OR REPLACE FUNCTION stable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ stable parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END stable_f; +/ + +CREATE OR REPLACE FUNCTION volatile_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ volatile parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END volatile_f; +/ + +CREATE OR REPLACE FUNCTION immutable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ immutable parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END immutable_f; +/ + +-- Alter Function set volatile/stable would clear parallel_cursor info +alter function immutable_f(p SYS_REFCURSOR) volatile; +select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'immutable_f'::regproc; + +alter function immutable_f(p SYS_REFCURSOR) stable; +alter function immutable_f(p SYS_REFCURSOR) immutable; + +-- throw error when the operation of parallel cursor is not FETCH CURSOR +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH absolute 5 from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ + +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH backward from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ + +CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH prior from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_opr_f; +/ + +-- test specified non refcursor type +CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_type_f; +/ + +CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR, a int) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END invalid_type_f; +/ + +-- create non-SRF/pipelined function +CREATE OR REPLACE FUNCTION return_int_f (p SYS_REFCURSOR) RETURN int parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + res int := 0; +BEGIN + LOOP + FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + res := res + 1; + END LOOP; + RETURN res; +END return_int_f; +/ + +explain (costs off) select * from return_int_f(cursor (select * from employees)); +select * from return_int_f(cursor (select * from employees)); + +-- declare cursor +begin; +declare xc no scroll cursor for select * from employees; +explain select * from hash_srf('xc'); +end; + +-- test bulk collect +CREATE OR REPLACE FUNCTION bulk_collect_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); + emp_tab table_my_outrec_typ; +BEGIN + LOOP + FETCH p bulk collect INTO emp_tab limit 5; -- input row + EXIT WHEN p%NOTFOUND; + out_rec := emp_tab(emp_tab.first); + return next out_rec; + END LOOP; + RETURN; +END bulk_collect_f; +/ + +explain (costs off) select count(*) from bulk_collect_f(cursor (select * from employees)); +select count(*) from bulk_collect_f(cursor (select * from employees)); + +-- create package +create or replace package my_pkg as + FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any); +end my_pkg; +/ + +create or replace package body my_pkg as + FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS + out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL); +BEGIN + LOOP + FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row + EXIT WHEN p%NOTFOUND; + return next out_rec; + END LOOP; + RETURN; +END pkg_f; +end my_pkg; +/ + +explain (costs off) select count(*) from my_pkg.pkg_f(cursor (select * from employees)); +select count(*) from my_pkg.pkg_f(cursor (select * from employees)); + +drop schema parallel_enable_function cascade; diff --git a/src/test/regress/sql/smp_cursor.sql b/src/test/regress/sql/smp_cursor.sql new file mode 100644 index 000000000..8334505c5 --- /dev/null +++ b/src/test/regress/sql/smp_cursor.sql @@ -0,0 +1,82 @@ +create schema smp_cursor; +set search_path=smp_cursor; + +create table t1(a int, b int, c int, d bigint); +insert into t1 values(generate_series(1, 100), generate_series(1, 10), generate_series(1, 2), generate_series(1, 50)); +analyze t1; + +set query_dop=1002; + +explain (costs off) select * from t1; + +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test cursor smp +begin; +declare xc no scroll cursor for select * from t1; +fetch xc; +end; + +-- test plan hint +begin; +declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1; +fetch xc; +end; + +set query_dop = 1; +begin; +declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1; +fetch xc; +end; + +-- scroll cursor can not smp +set query_dop = 1002; +begin; +declare xc cursor for select /*+ set(query_dop 1002) */ * from t1; +fetch xc; +end; + +-- cursor declared with plpgsql can not smp +declare + cursor xc no scroll is select * from t1; + tmp t1%ROWTYPE; +begin + open xc; + fetch xc into tmp; + close xc; +end; +/ + +-- test resource conflict checking +begin; +declare xc no scroll cursor for select * from t1; +drop table t1; +end; + +-- test cursor with hold +begin; +declare xc no scroll cursor with hold for select * from t1; +fetch xc; +end; +fetch absolute 10 xc; +close xc; + +-- test cursor backward error +begin; +declare xc no scroll cursor for select * from t1; +fetch absolute 10 xc; +fetch absolute 9 xc; +end; + +-- test cursor other operate +begin; +declare xc no scroll cursor for select * from t1; +fetch first xc; +fetch forward xc; +fetch absolute 5 xc; +fetch relative 5 xc; +fetch all xc; +move xc; +end; + +drop schema smp_cursor cascade; \ No newline at end of file