add testcase smp_cursor&&parallel_enable_function

This commit is contained in:
chenxiaobin19
2024-07-15 15:14:46 +08:00
parent 2afbe6fb56
commit 584d420639
8 changed files with 1750 additions and 2 deletions

View File

@ -0,0 +1,670 @@
create schema parallel_enable_function;
set search_path=parallel_enable_function;
create table employees (employee_id number(6), department_id NUMBER, first_name varchar2(30), last_name varchar2(30), email varchar2(30), phone_number varchar2(30));
BEGIN
FOR i IN 1..100 LOOP
INSERT INTO employees VALUES (i, 60, 'abc', 'def', '123', '123');
END LOOP;
COMMIT;
END;
/
CREATE TYPE my_outrec_typ AS (
employee_id numeric(6,0),
department_id numeric,
first_name character varying(30),
last_name character varying(30),
email character varying(30),
phone_number character varying(30)
);
-- create srf function with parallel_enable
CREATE OR REPLACE FUNCTION hash_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END hash_srf;
/
NOTICE: immutable would be set if parallel_enable specified
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
NOTICE: immutable would be set if parallel_enable specified
-- create function with multi-partkey
CREATE OR REPLACE FUNCTION multi_partkey_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id, department_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END multi_partkey_srf;
/
NOTICE: immutable would be set if parallel_enable specified
-- create pipelined function
create type table_my_outrec_typ is table of my_outrec_typ;
CREATE OR REPLACE FUNCTION pipelined_table_f (p SYS_REFCURSOR) RETURN table_my_outrec_typ pipelined parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
pipe row(out_rec);
END LOOP;
END pipelined_table_f;
/
NOTICE: immutable would be set if parallel_enable specified
CREATE OR REPLACE FUNCTION pipelined_array_f (p SYS_REFCURSOR) RETURN _employees PIPELINED parallel_enable (partition p by any)
IS
in_rec my_outrec_typ;
BEGIN
LOOP
FETCH p INTO in_rec;
EXIT WHEN p%NOTFOUND;
PIPE ROW (in_rec);
END LOOP;
END pipelined_array_f;
/
NOTICE: immutable would be set if parallel_enable specified
-- without partition by
CREATE OR REPLACE FUNCTION no_partition_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END no_partition_srf;
/
NOTICE: immutable would be set if parallel_enable specified
-- call function
set query_dop = 1002;
explain (costs off) select * from hash_srf(cursor (select * from employees)) limit 10;
QUERY PLAN
----------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on hash_srf
(4 rows)
select * from hash_srf(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
explain (costs off) select * from any_srf(cursor (select * from employees)) limit 10;
QUERY PLAN
----------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on any_srf
(4 rows)
select * from any_srf(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
explain (costs off) select * from pipelined_table_f(cursor (select * from employees)) limit 10;
QUERY PLAN
------------------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on pipelined_table_f
(4 rows)
select * from pipelined_table_f(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
explain (costs off) select * from multi_partkey_srf(cursor (select * from employees)) limit 10;
QUERY PLAN
------------------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on multi_partkey_srf
(4 rows)
select * from multi_partkey_srf(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
explain (costs off) select * from pipelined_array_f(cursor (select * from employees)) limit 10;
QUERY PLAN
------------------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on pipelined_array_f
(4 rows)
select * from pipelined_array_f(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
explain (costs off) select * from no_partition_srf(cursor (select * from employees)) limit 10;
QUERY PLAN
-----------------------------------------
Limit
-> Function Scan on no_partition_srf
(2 rows)
select * from no_partition_srf(cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
1 | 60 | abc | def | 123 | 123
2 | 60 | abc | def | 123 | 123
3 | 60 | abc | def | 123 | 123
4 | 60 | abc | def | 123 | 123
5 | 60 | abc | def | 123 | 123
6 | 60 | abc | def | 123 | 123
7 | 60 | abc | def | 123 | 123
8 | 60 | abc | def | 123 | 123
9 | 60 | abc | def | 123 | 123
10 | 60 | abc | def | 123 | 123
(10 rows)
-- test count(*)
explain (costs off) select count(*) from hash_srf(cursor (select * from employees));
QUERY PLAN
----------------------------------------------
Aggregate
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Aggregate
-> Function Scan on hash_srf
(4 rows)
select count(*) from hash_srf(cursor (select * from employees));
count
-------
100
(1 row)
-- test multi cursor args
CREATE OR REPLACE FUNCTION multi_cursor_srf (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(employee_id)) IS
out_rec_1 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
out_rec_2 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p1 INTO out_rec_1.employee_id, out_rec_1.department_id, out_rec_1.first_name, out_rec_1.last_name, out_rec_1.email, out_rec_1.phone_number; -- input row
EXIT WHEN p1%NOTFOUND;
FETCH p2 INTO out_rec_2.employee_id, out_rec_2.department_id, out_rec_2.first_name, out_rec_2.last_name, out_rec_2.email, out_rec_2.phone_number; -- input row
EXIT WHEN p2%NOTFOUND;
return next out_rec_1;
END LOOP;
RETURN;
END multi_cursor_srf;
/
NOTICE: immutable would be set if parallel_enable specified
explain (costs off) select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10;
QUERY PLAN
-----------------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on multi_cursor_srf
(4 rows)
select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
(10 rows)
explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees));
QUERY PLAN
-----------------------------------------------------
Aggregate
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Aggregate
-> Function Scan on multi_cursor_srf
(4 rows)
select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees));
count
-------
100
(1 row)
-- nested function call
explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10;
QUERY PLAN
----------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Function Scan on hash_srf
(4 rows)
select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10;
employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
-- functionscan join
explain (costs off) select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10;
QUERY PLAN
-------------------------------------------------------------
Limit
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Limit
-> Nested Loop
-> Streaming(type: BROADCAST dop: 2/2)
-> Function Scan on hash_srf a
-> Function Scan on hash_srf b
(7 rows)
select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10;
employee_id | department_id | first_name | last_name | email | phone_number | employee_id | department_id | first_name | last_name | email | phone_number
-------------+---------------+------------+-----------+-------+--------------+-------------+---------------+------------+-----------+-------+--------------
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
--?*
(10 rows)
-- targetlist
explain (costs off) select hash_srf(cursor (select * from employees)) limit 10;
QUERY PLAN
--------------
Limit
-> Result
(2 rows)
select hash_srf(cursor (select * from employees)) limit 10;
hash_srf
-------------------------
(1,60,abc,def,123,123)
(2,60,abc,def,123,123)
(3,60,abc,def,123,123)
(4,60,abc,def,123,123)
(5,60,abc,def,123,123)
(6,60,abc,def,123,123)
(7,60,abc,def,123,123)
(8,60,abc,def,123,123)
(9,60,abc,def,123,123)
(10,60,abc,def,123,123)
(10 rows)
-- subquery cannot smp
explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees)));
QUERY PLAN
-----------------------------------------
Result
InitPlan 1 (returns $0)
-> Aggregate
-> Function Scan on hash_srf
(4 rows)
select 1, (select count(*) from hash_srf(cursor (select * from employees)));
?column? | count
----------+-------
1 | 100
(1 row)
-- test create or replace
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc;
parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey
---------------------+--------------------------+-------------------------
(0 rows)
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
NOTICE: immutable would be set if parallel_enable specified
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc;
parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey
---------------------+--------------------------+-------------------------
0 | 0 | {}
(1 row)
-- set provolatile. stable/volatile with parallel_enable would throw error
CREATE OR REPLACE FUNCTION stable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ stable parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END stable_f;
/
ERROR: only immutable can be set if parallel_enable specified
CREATE OR REPLACE FUNCTION volatile_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ volatile parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END volatile_f;
/
ERROR: only immutable can be set if parallel_enable specified
CREATE OR REPLACE FUNCTION immutable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ immutable parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END immutable_f;
/
-- Alter Function set volatile/stable would clear parallel_cursor info
alter function immutable_f(p SYS_REFCURSOR) volatile;
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'immutable_f'::regproc;
parallel_cursor_seq | parallel_cursor_strategy | parallel_cursor_partkey
---------------------+--------------------------+-------------------------
(0 rows)
alter function immutable_f(p SYS_REFCURSOR) stable;
alter function immutable_f(p SYS_REFCURSOR) immutable;
-- throw error when the operation of parallel cursor is not FETCH CURSOR
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH absolute 5 from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
NOTICE: immutable would be set if parallel_enable specified
ERROR: only support FETCH CURSOR for parallel cursor "p"
CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH backward from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
NOTICE: immutable would be set if parallel_enable specified
ERROR: only support FETCH CURSOR for parallel cursor "p"
CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH prior from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
NOTICE: immutable would be set if parallel_enable specified
ERROR: only support FETCH CURSOR for parallel cursor "p"
CONTEXT: compilation of PL/pgSQL function "invalid_opr_f" near line 4
-- test specified non refcursor type
CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_type_f;
/
NOTICE: immutable would be set if parallel_enable specified
ERROR: partition expr must be cursor-type parameter
CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR, a int) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_type_f;
/
NOTICE: immutable would be set if parallel_enable specified
ERROR: partition expr must be cursor-type parameter
-- create non-SRF/pipelined function
CREATE OR REPLACE FUNCTION return_int_f (p SYS_REFCURSOR) RETURN int parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
res int := 0;
BEGIN
LOOP
FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
res := res + 1;
END LOOP;
RETURN res;
END return_int_f;
/
NOTICE: immutable would be set if parallel_enable specified
explain (costs off) select * from return_int_f(cursor (select * from employees));
QUERY PLAN
-------------------------------
Function Scan on return_int_f
(1 row)
select * from return_int_f(cursor (select * from employees));
return_int_f
--------------
100
(1 row)
-- declare cursor
begin;
declare xc no scroll cursor for select * from employees;
explain select * from hash_srf('xc');
QUERY PLAN
-------------------------------------------------------------------
Function Scan on hash_srf (cost=0.25..10.25 rows=1000 width=358)
(1 row)
end;
-- test bulk collect
CREATE OR REPLACE FUNCTION bulk_collect_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
emp_tab table_my_outrec_typ;
BEGIN
LOOP
FETCH p bulk collect INTO emp_tab limit 5; -- input row
EXIT WHEN p%NOTFOUND;
out_rec := emp_tab(emp_tab.first);
return next out_rec;
END LOOP;
RETURN;
END bulk_collect_f;
/
NOTICE: immutable would be set if parallel_enable specified
explain (costs off) select count(*) from bulk_collect_f(cursor (select * from employees));
QUERY PLAN
---------------------------------------------------
Aggregate
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Aggregate
-> Function Scan on bulk_collect_f
(4 rows)
select count(*) from bulk_collect_f(cursor (select * from employees));
count
-------
20
(1 row)
-- create package
create or replace package my_pkg as
FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any);
end my_pkg;
/
NOTICE: immutable would be set if parallel_enable specified
create or replace package body my_pkg as
FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f;
end my_pkg;
/
NOTICE: immutable would be set if parallel_enable specified
NOTICE: immutable would be set if parallel_enable specified
explain (costs off) select count(*) from my_pkg.pkg_f(cursor (select * from employees));
QUERY PLAN
----------------------------------------------
Aggregate
-> Streaming(type: LOCAL GATHER dop: 1/2)
-> Aggregate
-> Function Scan on pkg_f
(4 rows)
select count(*) from my_pkg.pkg_f(cursor (select * from employees));
count
-------
100
(1 row)
drop schema parallel_enable_function cascade;
NOTICE: drop cascades to 15 other objects
DETAIL: drop cascades to table employees
drop cascades to type my_outrec_typ
drop cascades to function hash_srf(refcursor)
drop cascades to function multi_partkey_srf(refcursor)
drop cascades to type table_my_outrec_typ
drop cascades to function pipelined_table_f(refcursor)
drop cascades to function pipelined_array_f(refcursor)
drop cascades to function no_partition_srf(refcursor)
drop cascades to function multi_cursor_srf(refcursor,refcursor)
drop cascades to function any_srf(refcursor)
drop cascades to function immutable_f(refcursor)
drop cascades to function return_int_f(refcursor)
drop cascades to function bulk_collect_f(refcursor)
--?drop cascades to package.*
drop cascades to function parallel_enable_function.pkg_f(refcursor)

View File

@ -0,0 +1,523 @@
create schema smp_cursor;
set search_path=smp_cursor;
create table t1(a int, b int, c int, d bigint);
insert into t1 values(generate_series(1, 100), generate_series(1, 10), generate_series(1, 2), generate_series(1, 50));
analyze t1;
set query_dop=1002;
explain (costs off) select * from t1;
QUERY PLAN
----------------------------------------
Streaming(type: LOCAL GATHER dop: 1/2)
-> Seq Scan on t1
(2 rows)
set enable_auto_explain = on;
set auto_explain_level = notice;
-- test cursor smp
begin;
declare xc no scroll cursor for select * from t1;
fetch xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration:.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
end;
-- test plan hint
begin;
declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1;
fetch xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1;
Name: datanode1
--?Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
end;
set query_dop = 1;
begin;
declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1;
fetch xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration:.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
end;
-- scroll cursor can not smp
set query_dop = 1002;
begin;
declare xc cursor for select /*+ set(query_dop 1002) */ * from t1;
fetch xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc cursor for select /*+ set(query_dop 1002) */ * from t1;
Name: datanode1
--?Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
---+---+---+---
1 | 1 | 1 | 1
(1 row)
end;
-- cursor declared with plpgsql can not smp
declare
cursor xc no scroll is select * from t1;
tmp t1%ROWTYPE;
begin
open xc;
fetch xc into tmp;
close xc;
end;
/
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: select * from t1
Name: datanode1
--?Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
CONTEXT: PL/pgSQL function inline_code_block line 5 at FETCH
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
CONTEXT: PL/pgSQL function inline_code_block line 5 at FETCH
-- test resource conflict checking
begin;
declare xc no scroll cursor for select * from t1;
drop table t1;
ERROR: cannot DROP TABLE "t1" because it is being used by active queries in this session
end;
-- test cursor with hold
begin;
declare xc no scroll cursor with hold for select * from t1;
fetch xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor with hold for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration:.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
end;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor with hold for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration:.*
fetch absolute 10 xc;
a | b | c | d
----+----+---+----
--?.*
(1 row)
close xc;
-- test cursor backward error
begin;
declare xc no scroll cursor for select * from t1;
fetch absolute 10 xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
----+----+---+----
--?.*
(1 row)
fetch absolute 9 xc;
ERROR: cursor with stream plan do not support scan backward.
end;
-- test cursor other operate
begin;
declare xc no scroll cursor for select * from t1;
fetch first xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
fetch forward xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
fetch absolute 5 xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
---+---+---+---
--?.*
(1 row)
fetch relative 5 xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
----+----+---+----
--?.*
(1 row)
fetch all xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
a | b | c | d
-----+----+---+----
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
--?.*
(90 rows)
move xc;
NOTICE:
QueryPlan
----------------------------NestLevel:0----------------------------
Query Text: declare xc no scroll cursor for select * from t1;
Name: datanode1
--?Streaming(type: LOCAL GATHER dop: 1/2).*
Output: a, b, c, d
Spawn on: All datanodes
Consumer Nodes: All datanodes
--? -> Seq Scan on smp_cursor.t1.*
Output: a, b, c, d
NOTICE:
----------------------------NestLevel:0----------------------------
--?duration.*
end;
drop schema smp_cursor cascade;
NOTICE: drop cascades to table t1

View File

@ -105,12 +105,54 @@ va pck9.r1;
end pck6;
/
-- test parallel_enable
CREATE TYPE my_outrec_typ AS (
employee_id numeric(6,0),
department_id numeric,
first_name character varying(30),
last_name character varying(30),
email character varying(30),
phone_number character varying(30)
);
create or replace package my_pkg as
FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any);
FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b));
end my_pkg;
/
create or replace package body my_pkg as
FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f_1;
FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p1%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f_2;
end my_pkg;
/
\! @abs_bindir@/gs_dump dump_package_db -p @portstring@ -f @abs_bindir@/dump_package.tar -F t >/dev/null 2>&1; echo $?
\! @abs_bindir@/gs_restore -d restore_package_db -p @portstring@ @abs_bindir@/dump_package.tar >/dev/null 2>&1; echo $?
\c restore_package_db
call rowtype_pckg.rowtype_func();
\sf my_pkg.pkg_f_1
\sf my_pkg.pkg_f_2
\c regression
drop database if exists restore_subpartition_db;
drop database if exists dump_subpartition_db;

View File

@ -100,6 +100,49 @@ create or replace package pck6 is
va pck9.r1;
end pck6;
/
-- test parallel_enable
CREATE TYPE my_outrec_typ AS (
employee_id numeric(6,0),
department_id numeric,
first_name character varying(30),
last_name character varying(30),
email character varying(30),
phone_number character varying(30)
);
create or replace package my_pkg as
FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any);
FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b));
end my_pkg;
/
NOTICE: immutable would be set if parallel_enable specified
NOTICE: immutable would be set if parallel_enable specified
create or replace package body my_pkg as
FUNCTION pkg_f_1 (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f_1;
FUNCTION pkg_f_2 (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(a,b)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p1%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f_2;
end my_pkg;
/
NOTICE: immutable would be set if parallel_enable specified
NOTICE: immutable would be set if parallel_enable specified
NOTICE: immutable would be set if parallel_enable specified
NOTICE: immutable would be set if parallel_enable specified
\! @abs_bindir@/gs_dump dump_package_db -p @portstring@ -f @abs_bindir@/dump_package.tar -F t >/dev/null 2>&1; echo $?
0
\! @abs_bindir@/gs_restore -d restore_package_db -p @portstring@ @abs_bindir@/dump_package.tar >/dev/null 2>&1; echo $?
@ -112,6 +155,32 @@ call rowtype_pckg.rowtype_func();
2 | b
(2 rows)
\sf my_pkg.pkg_f_1
CREATE OR REPLACE FUNCTION public.my_pkg.pkg_f_1(p SYS_REFCURSOR)
RETURN SETOF my_outrec_typ IMMUTABLE NOT FENCED NOT SHIPPABLE PARALLEL_ENABLE (PARTITION p BY ANY) PACKAGE
AS DECLARE out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END ;
/
\sf my_pkg.pkg_f_2
CREATE OR REPLACE FUNCTION public.my_pkg.pkg_f_2(p1 SYS_REFCURSOR, p2 SYS_REFCURSOR)
RETURN SETOF my_outrec_typ IMMUTABLE NOT FENCED NOT SHIPPABLE PARALLEL_ENABLE (PARTITION p1 BY HASH(a,b)) PACKAGE
AS DECLARE out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p1 INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p1%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END ;
/
\c regression
drop database if exists restore_subpartition_db;
NOTICE: database "restore_subpartition_db" does not exist, skipping

View File

@ -42,7 +42,7 @@ test: extract_pushdown_or_clause
test: workload_manager
test: spm_adaptive_gplan
test: smp
test: smp smp_cursor parallel_enable_function
test: alter_hw_package
test: hw_grant_package gsc_func gsc_db
test: uppercase_attribute_name decode_compatible_with_o outerjoin_bugfix chr_gbk

View File

@ -33,7 +33,7 @@ test: extract_pushdown_or_clause
test: workload_manager
test: spm_adaptive_gplan
test: smp
test: smp smp_cursor parallel_enable_function
test: alter_hw_package
test: hw_grant_package gsc_func gsc_db
test: uppercase_attribute_name decode_compatible_with_o outerjoin_bugfix

View File

@ -0,0 +1,362 @@
create schema parallel_enable_function;
set search_path=parallel_enable_function;
create table employees (employee_id number(6), department_id NUMBER, first_name varchar2(30), last_name varchar2(30), email varchar2(30), phone_number varchar2(30));
BEGIN
FOR i IN 1..100 LOOP
INSERT INTO employees VALUES (i, 60, 'abc', 'def', '123', '123');
END LOOP;
COMMIT;
END;
/
CREATE TYPE my_outrec_typ AS (
employee_id numeric(6,0),
department_id numeric,
first_name character varying(30),
last_name character varying(30),
email character varying(30),
phone_number character varying(30)
);
-- create srf function with parallel_enable
CREATE OR REPLACE FUNCTION hash_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END hash_srf;
/
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
-- create function with multi-partkey
CREATE OR REPLACE FUNCTION multi_partkey_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id, department_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END multi_partkey_srf;
/
-- create pipelined function
create type table_my_outrec_typ is table of my_outrec_typ;
CREATE OR REPLACE FUNCTION pipelined_table_f (p SYS_REFCURSOR) RETURN table_my_outrec_typ pipelined parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
pipe row(out_rec);
END LOOP;
END pipelined_table_f;
/
CREATE OR REPLACE FUNCTION pipelined_array_f (p SYS_REFCURSOR) RETURN _employees PIPELINED parallel_enable (partition p by any)
IS
in_rec my_outrec_typ;
BEGIN
LOOP
FETCH p INTO in_rec;
EXIT WHEN p%NOTFOUND;
PIPE ROW (in_rec);
END LOOP;
END pipelined_array_f;
/
-- without partition by
CREATE OR REPLACE FUNCTION no_partition_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END no_partition_srf;
/
-- call function
set query_dop = 1002;
explain (costs off) select * from hash_srf(cursor (select * from employees)) limit 10;
select * from hash_srf(cursor (select * from employees)) limit 10;
explain (costs off) select * from any_srf(cursor (select * from employees)) limit 10;
select * from any_srf(cursor (select * from employees)) limit 10;
explain (costs off) select * from pipelined_table_f(cursor (select * from employees)) limit 10;
select * from pipelined_table_f(cursor (select * from employees)) limit 10;
explain (costs off) select * from multi_partkey_srf(cursor (select * from employees)) limit 10;
select * from multi_partkey_srf(cursor (select * from employees)) limit 10;
explain (costs off) select * from pipelined_array_f(cursor (select * from employees)) limit 10;
select * from pipelined_array_f(cursor (select * from employees)) limit 10;
explain (costs off) select * from no_partition_srf(cursor (select * from employees)) limit 10;
select * from no_partition_srf(cursor (select * from employees)) limit 10;
-- test count(*)
explain (costs off) select count(*) from hash_srf(cursor (select * from employees));
select count(*) from hash_srf(cursor (select * from employees));
-- test multi cursor args
CREATE OR REPLACE FUNCTION multi_cursor_srf (p1 SYS_REFCURSOR, p2 SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p1 by hash(employee_id)) IS
out_rec_1 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
out_rec_2 my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p1 INTO out_rec_1.employee_id, out_rec_1.department_id, out_rec_1.first_name, out_rec_1.last_name, out_rec_1.email, out_rec_1.phone_number; -- input row
EXIT WHEN p1%NOTFOUND;
FETCH p2 INTO out_rec_2.employee_id, out_rec_2.department_id, out_rec_2.first_name, out_rec_2.last_name, out_rec_2.email, out_rec_2.phone_number; -- input row
EXIT WHEN p2%NOTFOUND;
return next out_rec_1;
END LOOP;
RETURN;
END multi_cursor_srf;
/
explain (costs off) select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10;
select * from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)) limit 10;
explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees));
select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees));
-- nested function call
explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10;
select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10;
-- functionscan join
explain (costs off) select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10;
select * from hash_srf(cursor (select * from employees)) a, hash_srf(cursor (select * from employees)) b limit 10;
-- targetlist
explain (costs off) select hash_srf(cursor (select * from employees)) limit 10;
select hash_srf(cursor (select * from employees)) limit 10;
-- subquery cannot smp
explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees)));
select 1, (select count(*) from hash_srf(cursor (select * from employees)));
-- test create or replace
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc;
CREATE OR REPLACE FUNCTION any_srf (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END any_srf;
/
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'any_srf'::regproc;
-- set provolatile. stable/volatile with parallel_enable would throw error
CREATE OR REPLACE FUNCTION stable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ stable parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END stable_f;
/
CREATE OR REPLACE FUNCTION volatile_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ volatile parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END volatile_f;
/
CREATE OR REPLACE FUNCTION immutable_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ immutable parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END immutable_f;
/
-- Alter Function set volatile/stable would clear parallel_cursor info
alter function immutable_f(p SYS_REFCURSOR) volatile;
select parallel_cursor_seq, parallel_cursor_strategy, parallel_cursor_partkey from pg_proc_ext where proc_oid = 'immutable_f'::regproc;
alter function immutable_f(p SYS_REFCURSOR) stable;
alter function immutable_f(p SYS_REFCURSOR) immutable;
-- throw error when the operation of parallel cursor is not FETCH CURSOR
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH absolute 5 from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH backward from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
CREATE OR REPLACE FUNCTION invalid_opr_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH prior from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_opr_f;
/
-- test specified non refcursor type
CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_type_f;
/
CREATE OR REPLACE FUNCTION invalid_type_f (p SYS_REFCURSOR, a int) RETURN setof my_outrec_typ parallel_enable (partition a by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END invalid_type_f;
/
-- create non-SRF/pipelined function
CREATE OR REPLACE FUNCTION return_int_f (p SYS_REFCURSOR) RETURN int parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
res int := 0;
BEGIN
LOOP
FETCH from p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
res := res + 1;
END LOOP;
RETURN res;
END return_int_f;
/
explain (costs off) select * from return_int_f(cursor (select * from employees));
select * from return_int_f(cursor (select * from employees));
-- declare cursor
begin;
declare xc no scroll cursor for select * from employees;
explain select * from hash_srf('xc');
end;
-- test bulk collect
CREATE OR REPLACE FUNCTION bulk_collect_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by hash(employee_id)) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
emp_tab table_my_outrec_typ;
BEGIN
LOOP
FETCH p bulk collect INTO emp_tab limit 5; -- input row
EXIT WHEN p%NOTFOUND;
out_rec := emp_tab(emp_tab.first);
return next out_rec;
END LOOP;
RETURN;
END bulk_collect_f;
/
explain (costs off) select count(*) from bulk_collect_f(cursor (select * from employees));
select count(*) from bulk_collect_f(cursor (select * from employees));
-- create package
create or replace package my_pkg as
FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any);
end my_pkg;
/
create or replace package body my_pkg as
FUNCTION pkg_f (p SYS_REFCURSOR) RETURN setof my_outrec_typ parallel_enable (partition p by any) IS
out_rec my_outrec_typ := my_outrec_typ(NULL, NULL, NULL, NULL, NULL, NULL);
BEGIN
LOOP
FETCH p INTO out_rec.employee_id, out_rec.department_id, out_rec.first_name, out_rec.last_name, out_rec.email, out_rec.phone_number; -- input row
EXIT WHEN p%NOTFOUND;
return next out_rec;
END LOOP;
RETURN;
END pkg_f;
end my_pkg;
/
explain (costs off) select count(*) from my_pkg.pkg_f(cursor (select * from employees));
select count(*) from my_pkg.pkg_f(cursor (select * from employees));
drop schema parallel_enable_function cascade;

View File

@ -0,0 +1,82 @@
create schema smp_cursor;
set search_path=smp_cursor;
create table t1(a int, b int, c int, d bigint);
insert into t1 values(generate_series(1, 100), generate_series(1, 10), generate_series(1, 2), generate_series(1, 50));
analyze t1;
set query_dop=1002;
explain (costs off) select * from t1;
set enable_auto_explain = on;
set auto_explain_level = notice;
-- test cursor smp
begin;
declare xc no scroll cursor for select * from t1;
fetch xc;
end;
-- test plan hint
begin;
declare xc no scroll cursor for select /*+ set(query_dop 1) */ * from t1;
fetch xc;
end;
set query_dop = 1;
begin;
declare xc no scroll cursor for select /*+ set(query_dop 1002) */ * from t1;
fetch xc;
end;
-- scroll cursor can not smp
set query_dop = 1002;
begin;
declare xc cursor for select /*+ set(query_dop 1002) */ * from t1;
fetch xc;
end;
-- cursor declared with plpgsql can not smp
declare
cursor xc no scroll is select * from t1;
tmp t1%ROWTYPE;
begin
open xc;
fetch xc into tmp;
close xc;
end;
/
-- test resource conflict checking
begin;
declare xc no scroll cursor for select * from t1;
drop table t1;
end;
-- test cursor with hold
begin;
declare xc no scroll cursor with hold for select * from t1;
fetch xc;
end;
fetch absolute 10 xc;
close xc;
-- test cursor backward error
begin;
declare xc no scroll cursor for select * from t1;
fetch absolute 10 xc;
fetch absolute 9 xc;
end;
-- test cursor other operate
begin;
declare xc no scroll cursor for select * from t1;
fetch first xc;
fetch forward xc;
fetch absolute 5 xc;
fetch relative 5 xc;
fetch all xc;
move xc;
end;
drop schema smp_cursor cascade;