!6490 gms_tcp高级包

Merge pull request !6490 from 张世乐/utl_tcp_dev
This commit is contained in:
opengauss_bot
2024-11-05 09:52:55 +00:00
committed by Gitee
27 changed files with 3727 additions and 1 deletions

View File

@ -105,6 +105,7 @@ install:
@if test -d contrib/datavec; then $(MAKE) -C contrib/datavec clean; fi
@if test -d contrib/datavec; then $(MAKE) -C contrib/datavec $@; fi
@if test -d contrib/gms_stats; then $(MAKE) -C contrib/gms_stats $@; fi
@if test -d contrib/gms_tcp; then $(MAKE) -C contrib/gms_tcp $@; fi
@if test -d contrib/gms_profiler; then $(MAKE) -C contrib/gms_profiler $@; fi
@if test -d contrib/gms_output; then $(MAKE) -C contrib/gms_output $@; fi
@if test -d contrib/timescaledb; then (./contrib/timescaledb/run_to_build.sh && $(MAKE) -C contrib/timescaledb/build $@); fi

View File

@ -127,6 +127,8 @@
./share/postgresql/extension/gms_lob.control
./share/postgresql/extension/gms_stats--1.0.sql
./share/postgresql/extension/gms_stats.control
./share/postgresql/extension/gms_tcp--1.0.sql
./share/postgresql/extension/gms_tcp.control
./share/postgresql/extension/gms_profiler--1.0.sql
./share/postgresql/extension/gms_profiler.control
./share/postgresql/extension/gms_sql--1.0.sql
@ -833,6 +835,7 @@
./lib/postgresql/gms_output.so
./lib/postgresql/gms_lob.so
./lib/postgresql/gms_stats.so
./lib/postgresql/gms_tcp.so
./lib/postgresql/gms_profiler.so
./lib/postgresql/gms_sql.so
./lib/libpljava.so

View File

@ -119,6 +119,8 @@
./share/postgresql/extension/gms_profiler.control
./share/postgresql/extension/gms_sql--1.0.sql
./share/postgresql/extension/gms_sql.control
./share/postgresql/extension/gms_tcp--1.0.sql
./share/postgresql/extension/gms_tcp.control
./share/postgresql/timezone/GB-Eire
./share/postgresql/timezone/Turkey
./share/postgresql/timezone/Kwajalein
@ -805,6 +807,7 @@
./lib/postgresql/gms_stats.so
./lib/postgresql/gms_profiler.so
./lib/postgresql/gms_sql.so
./lib/postgresql/gms_tcp.so
./lib/libpljava.so
./lib/libpq.a
./lib/libpq.so

View File

@ -127,6 +127,8 @@
./share/postgresql/extension/gms_lob.control
./share/postgresql/extension/gms_stats--1.0.sql
./share/postgresql/extension/gms_stats.control
./share/postgresql/extension/gms_tcp--1.0.sql
./share/postgresql/extension/gms_tcp.control
./share/postgresql/extension/gms_profiler--1.0.sql
./share/postgresql/extension/gms_profiler.control
./share/postgresql/extension/gms_sql--1.0.sql
@ -830,6 +832,7 @@
./lib/postgresql/dblink.so
./lib/postgresql/gms_lob.so
./lib/postgresql/gms_stats.so
./lib/postgresql/gms_tcp.so
./lib/postgresql/pgoutput.so
./lib/postgresql/assessment.so
./lib/postgresql/gms_output.so

View File

@ -31,6 +31,7 @@ set(CMAKE_MODULE_PATH
${CMAKE_CURRENT_SOURCE_DIR}/gms_profiler
${CMAKE_CURRENT_SOURCE_DIR}/gms_lob
${CMAKE_CURRENT_SOURCE_DIR}/gms_sql
${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp
)
add_subdirectory(hstore)
@ -51,6 +52,7 @@ add_subdirectory(file_fdw)
add_subdirectory(log_fdw)
add_subdirectory(gms_stats)
add_subdirectory(gms_sql)
add_subdirectory(gms_tcp)
if("${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF")
add_subdirectory(gc_fdw)
endif()

View File

@ -0,0 +1,21 @@
#This is the main CMAKE for build all gms_tcp.
# gms_tcp
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_gms_tcp_SRC)
set(TGT_gms_tcp_INC
${PROJECT_OPENGS_DIR}/contrib/gms_tcp
${PROJECT_OPENGS_DIR}/contrib
)
set(gms_tcp_DEF_OPTIONS ${MACRO_OPTIONS})
set(gms_tcp_COMPILE_OPTIONS ${OPTIMIZE_OPTIONS} ${OS_OPTIONS} ${PROTECT_OPTIONS} ${WARNING_OPTIONS} ${LIB_SECURE_OPTIONS} ${CHECK_OPTIONS})
set(gms_tcp_LINK_OPTIONS ${LIB_LINK_OPTIONS})
add_shared_libtarget(gms_tcp TGT_gms_tcp_SRC TGT_gms_tcp_INC "${gms_tcp_DEF_OPTIONS}" "${gms_tcp_COMPILE_OPTIONS}" "${gms_tcp_LINK_OPTIONS}")
set_target_properties(gms_tcp PROPERTIES PREFIX "")
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp.control
DESTINATION share/postgresql/extension/
)
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp--1.0.sql
DESTINATION share/postgresql/extension/
)
install(TARGETS gms_tcp DESTINATION lib/postgresql)

29
contrib/gms_tcp/Makefile Normal file
View File

@ -0,0 +1,29 @@
# contrib/gms_tcp/Makefile
MODULE_big = gms_tcp
OBJS = gms_tcp.o
EXTENSION = gms_tcp
DATA = gms_tcp--1.0.sql
exclude_option = -fPIE
override CPPFLAGS := -fstack-protector-strong $(filter-out $(exclude_option),$(CPPFLAGS))
REGRESS = gms_tcp
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/gms_tcp
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
regress_home = $(top_builddir)/src/test/regress
REGRESS_OPTS = -c 0 -d 1 -r 1 -p 25632 --single_node -w --keep_last_data=false \
--regconf=$(regress_home)/regress.conf \
--schedule=parallel_schedule \
--temp-config=$(regress_home)/make_fastcheck_postgresql.conf
include $(top_srcdir)/contrib/contrib-global.mk
endif
gms_tcp.o: gms_tcp.cpp

View File

@ -0,0 +1 @@
The openGauss regression needs this file to run.

View File

@ -0,0 +1,13 @@
\dx
List of installed extensions
Name | Version | Schema | Description
-----------------+---------+------------+------------------------------------------------------------
dist_fdw | 1.0 | pg_catalog | foreign-data wrapper for distfs access
file_fdw | 1.0 | pg_catalog | foreign-data wrapper for flat file access
gms_tcp | 1.0 | public | provides TCP/IP client=side access functionality in PL/SQL
hstore | 1.1 | pg_catalog | data type for storing sets of (key, value) pairs
log_fdw | 1.0 | pg_catalog | Foreign Data Wrapper for accessing logging data
plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language
security_plugin | 1.0 | pg_catalog | provides security functionality
(7 rows)

View File

@ -0,0 +1,714 @@
--
--test in_buffer
--
create extension gms_tcp;
create or replace function gms_tcp_test_in_buffer()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'in buffer');
pg_sleep(1);
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 5;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 12;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 13;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 12;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 5;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 17;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 9;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 11;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--
--test read data
--
--get line
create or replace function gms_tcp_test_get_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get line');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c, true);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--get text
create or replace function gms_tcp_test_get_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get text');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_text(c, 17, true);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--get raw
create or replace function gms_tcp_test_get_raw()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data raw;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get raw');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_raw(c, 4, true);
raise info 'available: %, rcv: %.', num, data;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_raw(c, 8);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--read line
create or replace function gms_tcp_test_read_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read line');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--read text
create or replace function gms_tcp_test_read_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
out_len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read text');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 18;
gms_tcp.read_text(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_read_raw()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data raw;
len integer;
out_len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read raw');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 3;
gms_tcp.read_raw(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 4;
gms_tcp.read_raw(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 8;
gms_tcp.read_raw(c, data, len, out_len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--write line
create or replace function gms_tcp_test_write_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
newline=>'LF',
tx_timeout=>10);
num = gms_tcp.write_line(c, 'write line');
pg_sleep(1);
num = gms_tcp.write_line(c, '0123456789');
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--write text
create or replace function gms_tcp_test_write_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
--out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_text(c, 'write text', 10);
pg_sleep(1);
num = gms_tcp.write_text(c, '0123456789', 6);
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_error_in_buffer_size()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>40480,
out_buffer_size=>20480,
newline=>'lf',
tx_timeout=>10);
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_error_out_buffer_size()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>40480,
newline=>'lf',
tx_timeout=>10);
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--
--char_set
--
create or replace function gms_tcp_test_char_set()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
cset=>'gbk',
tx_timeout=>10);
num = gms_tcp.write_line(c, 'char set');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_quit()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'quit');
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
select pg_sleep(5);
pg_sleep
----------
(1 row)
select gms_tcp_test_in_buffer();
INFO: available: 8, rcv: aaaab(5).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 11, rcv: bbbccccdddd(12).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: eeeeffff(13).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: gggghhhh(12).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: 01234(5).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 11, rcv: 567aaaabbbb(17).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: ccccdddd(9).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: eeeeffff(11).
CONTEXT: referenced column: gms_tcp_test_in_buffer
INFO: available: 8, rcv: gggghhhh.
CONTEXT: referenced column: gms_tcp_test_in_buffer
gms_tcp_test_in_buffer
------------------------
(1 row)
select gms_tcp_test_get_line();
INFO: available: 28, rcv: get line, abcdefg1234567890
.
CONTEXT: referenced column: gms_tcp_test_get_line
gms_tcp_test_get_line
-----------------------
(1 row)
select gms_tcp_test_get_text();
INFO: available: 28, rcv: get text, abcdefg.
CONTEXT: referenced column: gms_tcp_test_get_text
gms_tcp_test_get_text
-----------------------
(1 row)
select gms_tcp_test_get_raw();
INFO: available: 10, rcv: 01020304.
CONTEXT: referenced column: gms_tcp_test_get_raw
INFO: available: 10, rcv: 0102030405060708.
CONTEXT: referenced column: gms_tcp_test_get_raw
gms_tcp_test_get_raw
----------------------
(1 row)
select gms_tcp_test_read_line();
INFO: available: 29, rcv: read line, abcdefg1234567890
(29).
CONTEXT: referenced column: gms_tcp_test_read_line
gms_tcp_test_read_line
------------------------
(1 row)
select gms_tcp_test_read_text();
INFO: available: 29, rcv: read text, abcdefg(18).
CONTEXT: referenced column: gms_tcp_test_read_text
gms_tcp_test_read_text
------------------------
(1 row)
select gms_tcp_test_read_raw();
INFO: available: 10, rcv: 010203(6).
CONTEXT: referenced column: gms_tcp_test_read_raw
INFO: available: 10, rcv: 01020304(8).
CONTEXT: referenced column: gms_tcp_test_read_raw
INFO: available: 10, rcv: 0102030405060708(16).
CONTEXT: referenced column: gms_tcp_test_read_raw
gms_tcp_test_read_raw
-----------------------
(1 row)
select gms_tcp_test_write_line();
gms_tcp_test_write_line
-------------------------
(1 row)
select gms_tcp_test_write_text();
gms_tcp_test_write_text
-------------------------
(1 row)
select gms_tcp_test_error_in_buffer_size();
INFO: caught gms_tcp_bad_argument
CONTEXT: referenced column: gms_tcp_test_error_in_buffer_size
gms_tcp_test_error_in_buffer_size
-----------------------------------
(1 row)
select gms_tcp_test_error_out_buffer_size();
INFO: caught gms_tcp_bad_argument
CONTEXT: referenced column: gms_tcp_test_error_out_buffer_size
gms_tcp_test_error_out_buffer_size
------------------------------------
(1 row)
select gms_tcp_test_char_set();
INFO: available: 7, rcv: abcdefg.
CONTEXT: referenced column: gms_tcp_test_char_set
gms_tcp_test_char_set
-----------------------
(1 row)
select gms_tcp_test_quit();
gms_tcp_test_quit
-------------------
(1 row)
drop function gms_tcp_test_in_buffer();
drop function gms_tcp_test_get_line();
drop function gms_tcp_test_get_text();
drop function gms_tcp_test_get_raw();
drop function gms_tcp_test_read_line();
drop function gms_tcp_test_read_text();
drop function gms_tcp_test_read_raw();
drop function gms_tcp_test_write_line();
drop function gms_tcp_test_write_text();
drop function gms_tcp_test_error_in_buffer_size();
drop function gms_tcp_test_error_out_buffer_size();
drop function gms_tcp_test_char_set();
drop function gms_tcp_test_quit();

View File

@ -0,0 +1,330 @@
--
-- create schema
--
CREATE SCHEMA gms_tcp;
--
-- create type connection
--
CREATE TYPE gms_tcp.connection;
CREATE FUNCTION gms_tcp.connection_in(cstring)
RETURNS gms_tcp.connection
AS 'MODULE_PATHNAME','gms_tcp_connection_in'
LANGUAGE C STRICT NOT FENCED;
CREATE FUNCTION gms_tcp.connection_out(gms_tcp.connection)
RETURNS cstring
AS 'MODULE_PATHNAME','gms_tcp_connection_out'
LANGUAGE C STRICT NOT FENCED;
CREATE TYPE gms_tcp.connection(
internallength = 512,
input = gms_tcp.connection_in,
output = gms_tcp.connection_out
);
--
-- function: crlf
--
CREATE FUNCTION gms_tcp.crlf()
RETURNS varchar2
AS 'MODULE_PATHNAME','gms_tcp_crlf'
LANGUAGE C STRICT NOT FENCED;
--
-- function: available
-- determines the number of bytes available for reading from a tcp/ip connection.
--
CREATE FUNCTION gms_tcp.available_real(c in gms_tcp.connection,
timeout in int)
RETURNS integer
AS 'MODULE_PATHNAME','gms_tcp_available_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.available(c in gms_tcp.connection,
timeout in int default 0)
returns integer
language plpgsql
as $function$
begin
return gms_tcp.available_real(c, timeout);
end;
$function$;
--
-- funciton: close_all_connections
--
CREATE FUNCTION gms_tcp.close_all_connections()
RETURNS void
AS 'MODULE_PATHNAME','gms_tcp_close_all_connections'
LANGUAGE C STRICT NOT FENCED;
--
-- funciton: close_connection
--
CREATE FUNCTION gms_tcp.close_connection(c in gms_tcp.connection)
RETURNS void
AS 'MODULE_PATHNAME','gms_tcp_close_connection'
LANGUAGE C STRICT NOT FENCED;
--
-- function: flush
-- transmits immediately to the server all data in the output buffer, if a buffer is used.
--
CREATE FUNCTION gms_tcp.flush(c in gms_tcp.connection)
RETURNS void
AS 'MODULE_PATHNAME','gms_tcp_flush'
LANGUAGE C STRICT NOT FENCED;
--
-- function: get_line
--
CREATE FUNCTION gms_tcp.get_line_real(c in gms_tcp.connection,
remove_crlf in boolean,
peek in boolean,
ch_charset in boolean default false)
RETURNS text
AS 'MODULE_PATHNAME','gms_tcp_get_line_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.get_line(c in gms_tcp.connection,
remove_crlf in boolean default false,
peek in boolean default false)
returns text
language plpgsql
as $function$
begin
return gms_tcp.get_line_real(c, remove_crlf, peek, false);
end;
$function$;
--
-- function: get_raw
--
CREATE FUNCTION gms_tcp.get_raw_real(c in gms_tcp.connection,
len in integer,
peek in boolean)
RETURNS raw
AS 'MODULE_PATHNAME','gms_tcp_get_raw_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.get_raw(c in gms_tcp.connection,
len in integer default 1,
peek in boolean default false)
returns raw
language plpgsql
as $function$
begin
return gms_tcp.get_raw_real(c, len, peek);
end;
$function$;
--
-- function: get_text
--
CREATE FUNCTION gms_tcp.get_text_real(c in gms_tcp.connection,
len in integer,
peek in boolean,
ch_charset in boolean default false)
RETURNS text
AS 'MODULE_PATHNAME','gms_tcp_get_text_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.get_text(c in gms_tcp.connection,
len in integer default 1,
peek in boolean default false)
returns text
language plpgsql
as $function$
begin
return gms_tcp.get_text_real(c, len, peek, false);
end;
$function$;
--
-- function: open_connection
--
CREATE FUNCTION gms_tcp.open_connection_real(remote_host in varchar2,
remote_port in integer,
local_host in varchar2 default 0,
local_port in integer default 0,
in_buffer_size in integer default 0,
out_buffer_size in integer default 0,
cset in varchar2 default 0,
newline in varchar2 default 'CRLF',
tx_timeout in integer default 2147483647)
RETURNS gms_tcp.connection
AS 'MODULE_PATHNAME','gms_tcp_open_connection'
LANGUAGE C STRICT NOT FENCED;
CREATE FUNCTION gms_tcp.open_connection(remote_host in varchar2,
remote_port in integer,
local_host in varchar2 default null,
local_port in integer default null,
in_buffer_size in integer default null,
out_buffer_size in integer default null,
cset in varchar2 default null,
newline in varchar2 default 'CRLF',
tx_timeout in integer default null)
RETURNS gms_tcp.connection
as $$
declare
local_host_tmp varchar2;
local_port_tmp integer;
in_buffer_size_tmp integer;
out_buffer_size_tmp integer;
cset_tmp varchar2;
newline_tmp varchar2;
tx_timeout_tmp integer;
begin
if remote_host is null or remote_port is null then
raise exception 'error input, remote_host or remote_port is null';
end if;
if local_host is null then
local_host_tmp = 0;
else
local_host_tmp = local_host;
end if;
if local_port is null then
local_port_tmp = 0;
else
local_port_tmp = local_port;
end if;
if in_buffer_size is null then
in_buffer_size_tmp = 0;
else
in_buffer_size_tmp = in_buffer_size;
end if;
if out_buffer_size is null then
out_buffer_size_tmp = 0;
else
out_buffer_size_tmp = out_buffer_size;
end if;
if cset is null then
cset_tmp = 0;
else
cset_tmp = cset;
end if;
if newline is null then
newline_tmp = 'CRLF';
else
newline_tmp = newline;
end if;
if tx_timeout is null then
tx_timeout_tmp = 2147483647;
else
tx_timeout_tmp = tx_timeout;
end if;
return gms_tcp.open_connection_real(remote_host,
remote_port,
local_host_tmp,
local_port_tmp,
in_buffer_size_tmp,
out_buffer_size_tmp,
cset_tmp,
newline_tmp,
tx_timeout_tmp);
end;
$$ LANGUAGE plpgsql;
--
-- function: read_line
--
CREATE OR REPLACE PROCEDURE gms_tcp.read_line(c in gms_tcp.connection,
data out varchar2,
len out integer,
remove_crlf in boolean default false,
peek in boolean default false)
as
begin
data = gms_tcp.get_line_real(c, remove_crlf, peek, true);
len = length(data);
end;
--
-- function: read_raw
--
CREATE OR REPLACE PROCEDURE gms_tcp.read_raw(c in gms_tcp.connection,
data out raw,
data_len out integer,
len in integer default 1,
peek in boolean default false)
as
begin
data = gms_tcp.get_raw_real(c, len, peek);
data_len = length(data);
end;
--
-- function: read_text
--
CREATE OR REPLACE PROCEDURE gms_tcp.read_text(c in gms_tcp.connection,
data out varchar2,
data_len out integer,
len in integer default 1,
peek in boolean default false)
as
begin
data = gms_tcp.get_text_real(c, len, peek, true);
data_len = length(data);
end;
--
-- function: write_line
--
CREATE FUNCTION gms_tcp.write_line(c in gms_tcp.connection,
data in varchar2)
RETURNS integer
AS 'MODULE_PATHNAME','gms_tcp_write_line'
LANGUAGE C STRICT NOT FENCED;
--
-- function: write_raw
--
CREATE FUNCTION gms_tcp.write_raw_real(c in gms_tcp.connection,
data in raw,
len in integer)
RETURNS integer
AS 'MODULE_PATHNAME','gms_tcp_write_raw_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.write_raw(c in gms_tcp.connection,
data in raw,
len in integer default 0)
returns integer
language plpgsql
as $function$
begin
return gms_tcp.write_raw_real(c, data, len);
end;
$function$;
--
-- function: write_text
--
CREATE FUNCTION gms_tcp.write_text_real(c in gms_tcp.connection,
data in varchar2,
len in integer default null)
RETURNS integer
AS 'MODULE_PATHNAME','gms_tcp_write_text_real'
LANGUAGE C STRICT NOT FENCED;
create or replace function gms_tcp.write_text(c in gms_tcp.connection,
data in varchar2,
len in integer default 0)
returns integer
language plpgsql
as $function$
begin
return gms_tcp.write_text_real(c, data, len);
end;
$function$;

View File

@ -0,0 +1,5 @@
# gms_tcp extension
comment = 'provides TCP/IP client=side access functionality in PL/SQL'
default_version = '1.0'
module_pathname = '$libdir/gms_tcp'
relocatable = true

1499
contrib/gms_tcp/gms_tcp.cpp Normal file

File diff suppressed because it is too large Load Diff

135
contrib/gms_tcp/gms_tcp.h Normal file
View File

@ -0,0 +1,135 @@
#ifndef __GMS_TCP__
#define __GMS_TCP__
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <iconv.h>
#include "postgres.h"
#include "knl/knl_thread.h"
#include "utils/builtins.h"
#include "utils/portal.h"
#include "utils/palloc.h"
#include "libpq/pqformat.h"
#include "utils/elog.h"
#define GMS_TCP_MAX_HOST_LEN 255
#define GMS_TCP_MAX_CHARSET_LEN 32
#define GMS_TCP_MAX_NEWLINE_LEN 16
#define GMS_TCP_MAX_IN_BUFFER_SIZE 32767
#define GMS_TCP_MAX_OUT_BUFFER_SIZE 32767
#define GMS_TCP_MAX_CONNECTION 32
#define GMS_TCP_MAX_PASSWORD_LEN 32
#define GMS_TCP_MAX_PATH_LEN 255
#define GMS_TCP_MAX_TX_TIMEOUT 2147483647
#define GMS_TCP_FD_BAK_LIST *u_sess->exec_cxt.portal_data_list
typedef enum {
GMS_TCP_CONNECT_OK = 0,
GMS_TCP_CONNECT_FAIL
} GMS_TCP_CONNECT_STATE;
typedef enum {
GMS_TCP_OK = 0,
GMS_TCP_STRING_TOO_LONG
} GMS_TCP_CONNECT_ERROR;
typedef struct {
int32 total_len;
int32 remote_host_len;
int32 remote_port_len;
int32 local_host_len;
int32 local_port_len;
int32 in_buffer_size_len;
int32 out_buffer_size_len;
int32 charset_len;
int32 newline_len;
int32 tx_timeout_len;
int32 fd_len;
} GMS_TCP_CONNECTION_HEAD;
#define GMS_TCP_MAX_TYPE_LEN 512
#define GMS_TCP_MAX_TYPE_DATA_LEN (GMS_TCP_MAX_TYPE_LEN - sizeof(GMS_TCP_CONNECTION_HEAD))
typedef struct {
GMS_TCP_CONNECTION_HEAD c_h;
char data[GMS_TCP_MAX_TYPE_DATA_LEN];
} GMS_TCP_CONNECTION;
typedef struct {
char *remote_host;
struct sockaddr_in saddr;
int32 remote_port;
char *local_host;
int32 local_port;
int32 in_buffer_size;
int32 out_buffer_size;
char *charset;
char *newline;
bool get_data_wait;
int32 tx_timeout;
bool available_wait;
int32 available_timeout;
int fd;
} GMS_TCP_CONNECTION_INFO;
typedef struct {
int32 max_buffer_size;
int32 free_space;
int32 start;
int32 end;
char *data;
} GMS_TCP_CONNECTION_BUFFER;
typedef struct {
int fd;
bool secure;
GMS_TCP_CONNECT_STATE state;
GMS_TCP_CONNECTION_BUFFER in_buffer;
GMS_TCP_CONNECTION_BUFFER out_buffer;
GMS_TCP_CONNECTION_INFO c_info;
} GMS_TCP_CONNECTION_STATE;
typedef struct {
bool remove_crlf;
bool peek;
bool report_err;
bool ch_charset;
bool encrypt;
int32 len;
char *data;
int32 data_len;
} GMS_TCP_CONNECTION_DATA_OPT;
typedef enum {
ORAFCE_COMM_OK = 0,
ORAFCE_COMM_STRING_TOO_LONG
} ORAFCE_COMM_ERROR;
extern void gms_tcp_release_connection_info(GMS_TCP_CONNECTION_INFO *c_info);
extern void gms_tcp_get_connection_info(GMS_TCP_CONNECTION *c, GMS_TCP_CONNECTION_INFO *c_info);
extern GMS_TCP_CONNECTION_STATE *gms_tcp_get_connection_state_by_fd(int fd);
extern void gms_tcp_get_connection_head_by_info(GMS_TCP_CONNECTION_INFO *c_info, GMS_TCP_CONNECTION_HEAD *c_h);
/*
* External declarations
*/
extern "C" Datum gms_tcp_crlf(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_available_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_close_all_connections(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_close_connection(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_flush(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_get_line_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_get_raw_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_get_text_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_open_connection(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_secure_connection(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_no_secure_connection(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_write_line(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_write_raw_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_write_text_real(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_connection_in(PG_FUNCTION_ARGS);
extern "C" Datum gms_tcp_connection_out(PG_FUNCTION_ARGS);
#endif

Binary file not shown.

View File

@ -0,0 +1,226 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.*;
import java.nio.charset.Charset;
public
class gms_tcp_server {
public
static void gms_tcp_in_buffer(InputStream inputStream, OutputStream outputStream)
{
int i = 0;
byte[] buffer = new byte[256];
String[] data = {"aaaabbbb",
"ccccdddd",
"eeeeffff",
"gggghhhh",
"01234567"};
while (true) {
try {
int n = inputStream.read(buffer);
String inputMsg = new String(buffer, 0, n);
inputMsg = inputMsg.replaceAll("[\n\r]", "");
if (inputMsg.equals("ok")) {
outputStream.write(data[i].getBytes());
System.out.println("Send msg: " + data[i]);
i = (i + 1) % 5;
}
} catch (Exception e) {
break;
}
}
}
public
static void gms_tcp_char_set(InputStream inputStream, OutputStream outputStream)
{
byte[] inputMsg = new byte[64];
try {
String outString = "abcdefg";
String outMsg = new String(outString.getBytes("GBK"));
outputStream.write(outMsg.getBytes());
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_get_line(InputStream inputStream, OutputStream outputStream)
{
String outMsg = "get line, abcdefg1234567890\n";
try {
outputStream.write(outMsg.getBytes());
System.out.println("Send msg: " + outMsg);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_get_text(InputStream inputStream, OutputStream outputStream)
{
String outMsg = "get text, abcdefg1234567890\n";
try {
outputStream.write(outMsg.getBytes());
System.out.println("Send msg: " + outMsg);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_get_raw(InputStream inputStream, OutputStream outputStream)
{
byte[] outMsg = {1,2,3,4,5,6,7,8,9,10};
try {
outputStream.write(outMsg);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_read_line(InputStream inputStream, OutputStream outputStream)
{
String outMsg = "read line, abcdefg1234567890\n";
try {
outputStream.write(outMsg.getBytes());
System.out.println("Send msg: " + outMsg);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_read_text(InputStream inputStream, OutputStream outputStream)
{
String outMsg = "read text, abcdefg1234567890\n";
try {
outputStream.write(outMsg.getBytes());
System.out.println("Send msg: " + outMsg);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_write_line(InputStream inputStream, OutputStream outputStream)
{
byte[] inputMsg = new byte[64];
try {
int n = inputStream.read(inputMsg);
String inputMsgString = new String(inputMsg, 0, n);
System.out.println(inputMsgString);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_write_text(InputStream inputStream, OutputStream outputStream)
{
byte[] inputMsg = new byte[64];
try {
int n = inputStream.read(inputMsg);
String inputMsgString = new String(inputMsg, 0, n);
System.out.println(inputMsgString);
} catch (Exception e) {
return;
}
}
public
static void gms_tcp_read_raw(InputStream inputStream, OutputStream outputStream)
{
byte[] outMsg = {1,2,3,4,5,6,7,8,9,10};
try {
outputStream.write(outMsg);
} catch (Exception e) {
return;
}
}
public
static void main(String[] args)
{
try {
int port = 12358;
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("Server is ok.");
while (true) {
boolean quit = false;
Socket clientSocket = serverSocket.accept();
System.out.println("Client connect ok.");
InputStream inputStream = clientSocket.getInputStream();
OutputStream outputStream = clientSocket.getOutputStream();
while (true) {
try {
byte[] testType = new byte[64];
int n = inputStream.read(testType);
String testTypeString = new String(testType, 0, n);
testTypeString = testTypeString.replaceAll("[\n\r]", "");
System.out.println("start test: " + testTypeString);
if (testTypeString.equals("get line")) {
gms_tcp_get_line(inputStream, outputStream);
break;
} else if (testTypeString.equals("get text")) {
gms_tcp_get_text(inputStream, outputStream);
break;
} else if (testTypeString.equals("get raw")) {
gms_tcp_get_raw(inputStream, outputStream);
break;
} else if (testTypeString.equals("read line")) {
gms_tcp_read_line(inputStream, outputStream);
break;
} else if (testTypeString.equals("read text")) {
gms_tcp_read_text(inputStream, outputStream);
break;
} else if (testTypeString.equals("read raw")) {
gms_tcp_read_raw(inputStream, outputStream);
break;
} else if (testTypeString.equals("in buffer")) {
gms_tcp_in_buffer(inputStream, outputStream);
break;
} else if (testTypeString.equals("write line")) {
gms_tcp_write_line(inputStream, outputStream);
break;
} else if (testTypeString.equals("write text")) {
gms_tcp_write_text(inputStream, outputStream);
break;
} else if (testTypeString.equals("quit")) {
quit = true;
break;
} else {
gms_tcp_char_set(inputStream, outputStream);
break;
}
} catch (Exception e) {
break;
}
}
if (quit) {
break;
}
System.out.println("one client over\n===================================================================\n");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,2 @@
\! chmod -R 700 @abs_bindir@/../jre
\! @abs_bindir@/../jre/bin/java -cp $CLASSPATH:@abs_builddir@/../../../src/test/regress/jdbc_test/gsjdbc400.jar:@abs_builddir@/gms_tcp_test/. gms_tcp_server

View File

@ -0,0 +1,75 @@
\! chmod -R 700 @abs_bindir@/../jre
\! @abs_bindir@/../jre/bin/java -cp $CLASSPATH:@abs_builddir@/../../../src/test/regress/jdbc_test/gsjdbc400.jar:@abs_builddir@/gms_tcp_test/. gms_tcp_server
Server is ok.
Client connect ok.
start test: in buffer
Send msg: aaaabbbb
Send msg: ccccdddd
Send msg: eeeeffff
Send msg: gggghhhh
Send msg: 01234567
Send msg: aaaabbbb
Send msg: ccccdddd
Send msg: eeeeffff
Send msg: gggghhhh
one client over
===================================================================
Client connect ok.
start test: get line
Send msg: get line, abcdefg1234567890
one client over
===================================================================
Client connect ok.
start test: get text
Send msg: get text, abcdefg1234567890
one client over
===================================================================
Client connect ok.
start test: get raw
one client over
===================================================================
Client connect ok.
start test: read line
Send msg: read line, abcdefg1234567890
one client over
===================================================================
Client connect ok.
start test: read text
Send msg: read text, abcdefg1234567890
one client over
===================================================================
Client connect ok.
start test: read raw
one client over
===================================================================
Client connect ok.
start test: write line
0123456789
one client over
===================================================================
Client connect ok.
start test: write text
012345
one client over
===================================================================
Client connect ok.
start test: char set
one client over
===================================================================
Client connect ok.
start test: quit

View File

@ -0,0 +1 @@
test: gms_tcp_server gms_tcp_client

View File

@ -0,0 +1 @@
\dx

View File

@ -0,0 +1,615 @@
--
--test in_buffer
--
create extension gms_tcp;
create or replace function gms_tcp_test_in_buffer()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'in buffer');
pg_sleep(1);
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 5;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 12;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 13;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 12;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 5;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 17;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 9;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
len = 11;
data = gms_tcp.get_text(c, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.write_line(c, 'ok');
pg_sleep(1);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--
--test read data
--
--get line
create or replace function gms_tcp_test_get_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get line');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c, true);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--get text
create or replace function gms_tcp_test_get_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get text');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_text(c, 17, true);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--get raw
create or replace function gms_tcp_test_get_raw()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data raw;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'get raw');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_raw(c, 4, true);
raise info 'available: %, rcv: %.', num, data;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_raw(c, 8);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--read line
create or replace function gms_tcp_test_read_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read line');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--read text
create or replace function gms_tcp_test_read_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
out_len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read text');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 18;
gms_tcp.read_text(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_read_raw()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data raw;
len integer;
out_len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'read raw');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 3;
gms_tcp.read_raw(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 4;
gms_tcp.read_raw(c, data, len, out_len, true);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
num = gms_tcp.available(c,1);
if num > 0 then
out_len = 8;
gms_tcp.read_raw(c, data, len, out_len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--write line
create or replace function gms_tcp_test_write_line()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
newline=>'LF',
tx_timeout=>10);
num = gms_tcp.write_line(c, 'write line');
pg_sleep(1);
num = gms_tcp.write_line(c, '0123456789');
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--write text
create or replace function gms_tcp_test_write_text()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
len integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
--out_buffer_size=>20480,
tx_timeout=>10);
num = gms_tcp.write_text(c, 'write text', 10);
pg_sleep(1);
num = gms_tcp.write_text(c, '0123456789', 6);
num = gms_tcp.available(c,1);
if num > 0 then
gms_tcp.read_line(c, data, len);
raise info 'available: %, rcv: %(%).', num, data, len;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_error_in_buffer_size()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>40480,
out_buffer_size=>20480,
newline=>'lf',
tx_timeout=>10);
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_error_out_buffer_size()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>40480,
newline=>'lf',
tx_timeout=>10);
gms_tcp.close_all_connections();
exception
when gms_tcp_network_error then
raise info 'caught gms_tcp_network_error';
gms_tcp.close_all_connections();
when gms_tcp_bad_argument then
raise info 'caught gms_tcp_bad_argument';
gms_tcp.close_all_connections();
when gms_tcp_buffer_too_small then
raise info 'caught gms_tcp_buffer_too_small';
gms_tcp.close_all_connections();
when gms_tcp_end_of_input then
raise info 'caught gms_tcp_end_of_input';
gms_tcp.close_all_connections();
when gms_tcp_transfer_timeout then
raise info 'caught gms_tcp_transfer_timeout';
gms_tcp.close_all_connections();
when gms_tcp_partial_multibyte_char then
raise info 'caught gms_tcp_partial_multibyte_char';
gms_tcp.close_all_connections();
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
--
--char_set
--
create or replace function gms_tcp_test_char_set()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
data varchar2;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
in_buffer_size=>20480,
out_buffer_size=>20480,
cset=>'gbk',
tx_timeout=>10);
num = gms_tcp.write_line(c, 'char set');
gms_tcp.flush(c);
num = gms_tcp.available(c,1);
if num > 0 then
data = gms_tcp.get_line(c);
raise info 'available: %, rcv: %.', num, data;
end if;
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
create or replace function gms_tcp_test_quit()
returns void
language plpgsql
as $function$
declare
c gms_tcp.connection;
num integer;
begin
c = gms_tcp.open_connection(remote_host=>'127.0.0.1',
remote_port=>12358,
tx_timeout=>10);
num = gms_tcp.write_line(c, 'quit');
gms_tcp.close_all_connections();
exception
when others then
raise info 'caught others';
gms_tcp.close_all_connections();
end;
$function$;
select pg_sleep(5);
select gms_tcp_test_in_buffer();
select gms_tcp_test_get_line();
select gms_tcp_test_get_text();
select gms_tcp_test_get_raw();
select gms_tcp_test_read_line();
select gms_tcp_test_read_text();
select gms_tcp_test_read_raw();
select gms_tcp_test_write_line();
select gms_tcp_test_write_text();
select gms_tcp_test_error_in_buffer_size();
select gms_tcp_test_error_out_buffer_size();
select gms_tcp_test_char_set();
select gms_tcp_test_quit();
drop function gms_tcp_test_in_buffer();
drop function gms_tcp_test_get_line();
drop function gms_tcp_test_get_text();
drop function gms_tcp_test_get_raw();
drop function gms_tcp_test_read_line();
drop function gms_tcp_test_read_text();
drop function gms_tcp_test_read_raw();
drop function gms_tcp_test_write_line();
drop function gms_tcp_test_write_text();
drop function gms_tcp_test_error_in_buffer_size();
drop function gms_tcp_test_error_out_buffer_size();
drop function gms_tcp_test_char_set();
drop function gms_tcp_test_quit();

View File

@ -639,4 +639,12 @@ Section: Class SE - Security Error
SE001 E ERRCODE_INVALID_AUDIT_LOG invalid_audit_log
Section: Class SR - Uncorrected Error & warning
SR001 E ERRCODE_SR_RECOVERY_CONFLICT recovery_conflict
SR001 E ERRCODE_SR_RECOVERY_CONFLICT recovery_conflict
Section: Class UT - gms_tcp
U0001 E ERRCODE_END_OF_INPUT gms_tcp_end_of_input
U0002 E ERRCODE_NETWORK_ERROR gms_tcp_network_error
U0003 E ERRCODE_BAD_ARGUMENT gms_tcp_bad_argument
U0004 E ERRCODE_TRANSFER_TIMEOUT gms_tcp_transfer_timeout
U0005 E ERRCODE_PARTIAL_MULTIBYTE_CHAR gms_tcp_partial_multibyte_char
U0006 E ERRCODE_BUFFER_TOO_SMALL gms_tcp_buffer_too_small

View File

@ -31,6 +31,7 @@
#include "utils/plpgsql.h"
#include "utils/timestamp.h"
#include "utils/resowner.h"
#include "utils/palloc.h"
#include "nodes/execnodes.h"
#include "opfusion/opfusion.h"
@ -274,6 +275,7 @@ Portal CreatePortal(const char* name, bool allowDup, bool dupSilent, bool is_fro
portal->isAutoOutParam = false;
portal->isPkgCur = false;
#endif
portal->specialDataList = NULL;
/* put portal in table (sets portal->name) */
PortalHashTableInsert(portal, name);
@ -530,6 +532,25 @@ void MarkPortalFailed(Portal portal)
}
}
void PortalReleaseSpecialData(Portal portal)
{
ListCell *lc = NULL;
if (!list_length(portal->specialDataList)) {
return;
}
foreach (lc, portal->specialDataList) {
PortalSpecialData *specialData = (PortalSpecialData *)lfirst(lc);
if (specialData->cleanup) {
specialData->cleanup(specialData->data);
}
pfree(specialData);
}
list_free_ext(portal->specialDataList);
}
/*
* PortalDrop
* Destroy the portal.
@ -643,6 +664,8 @@ void PortalDrop(Portal portal, bool isTopCommit)
#endif
MemoryContextDelete(portal->holdContext);
PortalReleaseSpecialData(portal);
/* release subsidiary storage */
MemoryContextDelete(PortalGetHeapMemory(portal));

View File

@ -1017,6 +1017,7 @@ bool PortalRun(
MemoryContext savePortalContext;
MemoryContext saveMemoryContext;
errno_t errorno = EOK;
List **savePortalDataList;
AssertArg(PortalIsValid(portal));
AssertArg(PointerIsValid(portal->commandTag));
@ -1094,6 +1095,7 @@ bool PortalRun(
saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner;
savePortalContext = t_thrd.mem_cxt.portal_mem_cxt;
saveMemoryContext = CurrentMemoryContext;
savePortalDataList = u_sess->exec_cxt.portal_data_list;
u_sess->attr.attr_sql.create_index_concurrently = false;
@ -1125,6 +1127,7 @@ bool PortalRun(
needResetErrMsg = stp_disable_xact_and_set_err_msg(&savedisAllowCommitRollback, STP_XACT_TOO_MANY_PORTAL);
}
t_thrd.mem_cxt.portal_mem_cxt = PortalGetHeapMemory(portal);
u_sess->exec_cxt.portal_data_list = &portal->specialDataList;
MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt);
@ -1221,6 +1224,7 @@ bool PortalRun(
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
}
t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;
u_sess->exec_cxt.portal_data_list = savePortalDataList;
ereport(DEBUG3, (errmodule(MOD_NEST_COMPILE), errcode(ERRCODE_LOG),
errmsg("%s clear curr_compile_context because of error.", __func__)));
@ -1269,6 +1273,7 @@ bool PortalRun(
t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner;
}
t_thrd.mem_cxt.portal_mem_cxt = savePortalContext;
u_sess->exec_cxt.portal_data_list = savePortalDataList;
if (portal->strategy != PORTAL_MULTI_QUERY) {
PGSTAT_END_TIME_RECORD(EXECUTION_TIME);

View File

@ -193,6 +193,7 @@ typedef struct knl_u_executor_context {
void *EventTriggerState;
bool isFlashBack;
List **portal_data_list;
} knl_u_executor_context;
typedef struct knl_u_sig_context {

View File

@ -379,6 +379,11 @@ public:
}
};
typedef struct {
std::shared_ptr<void> data;
void (*cleanup)(std::shared_ptr<void> data); /* cleanup hook */
} PortalSpecialData;
/*
*It is used for delete object whose destructor is null and free memory in Destroy()
*_objptr can't include type change, for example (A*)b, that will lead to compile error in (_objptr) = NULL

View File

@ -263,6 +263,11 @@ typedef struct PortalData {
bool isPkgCur; /* cursor variable is a package variable? */
#endif
int nextval_default_expr_type; /* nextval does not support lightproxy and sqlbypass */
List *specialDataList;
/*
* A specific data link list hung under the portal.
* The special data mounted in the portal can be used during cleaning,such as utl_tcp.
*/
} PortalData;
/*