From 64af972345402582ca6313697bacf6ae29116e34 Mon Sep 17 00:00:00 2001 From: zhangshile Date: Sat, 28 Sep 2024 22:13:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81utl=5Ftcp=E9=AB=98=E7=BA=A7?= =?UTF-8?q?=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GNUmakefile.in | 1 + build/script/aarch64_opengauss_list | 3 + .../opengauss_release_list_ubuntu_single | 3 + build/script/x86_64_opengauss_list | 3 + contrib/CMakeLists.txt | 2 + contrib/gms_tcp/CMakeLists.txt | 21 + contrib/gms_tcp/Makefile | 29 + contrib/gms_tcp/data/dummy.txt | 1 + contrib/gms_tcp/expected/gms_tcp.out | 13 + contrib/gms_tcp/expected/gms_tcp_client.out | 714 ++++++++ contrib/gms_tcp/gms_tcp--1.0.sql | 330 ++++ contrib/gms_tcp/gms_tcp.control | 5 + contrib/gms_tcp/gms_tcp.cpp | 1499 +++++++++++++++++ contrib/gms_tcp/gms_tcp.h | 135 ++ .../gms_tcp/gms_tcp_test/gms_tcp_server.class | Bin 0 -> 4436 bytes .../gms_tcp/gms_tcp_test/gms_tcp_server.java | 226 +++ contrib/gms_tcp/input/gms_tcp_server.source | 2 + contrib/gms_tcp/output/gms_tcp_server.source | 75 + contrib/gms_tcp/parallel_schedule | 1 + contrib/gms_tcp/sql/gms_tcp.sql | 1 + contrib/gms_tcp/sql/gms_tcp_client.sql | 615 +++++++ src/common/backend/utils/errcodes.txt | 10 +- src/common/backend/utils/mmgr/portalmem.cpp | 23 + src/gausskernel/process/tcop/pquery.cpp | 5 + src/include/knl/knl_session.h | 1 + src/include/utils/palloc.h | 5 + src/include/utils/portal.h | 5 + 27 files changed, 3727 insertions(+), 1 deletion(-) create mode 100644 contrib/gms_tcp/CMakeLists.txt create mode 100644 contrib/gms_tcp/Makefile create mode 100644 contrib/gms_tcp/data/dummy.txt create mode 100644 contrib/gms_tcp/expected/gms_tcp.out create mode 100644 contrib/gms_tcp/expected/gms_tcp_client.out create mode 100644 contrib/gms_tcp/gms_tcp--1.0.sql create mode 100644 contrib/gms_tcp/gms_tcp.control create mode 100644 contrib/gms_tcp/gms_tcp.cpp create mode 100644 contrib/gms_tcp/gms_tcp.h create mode 100644 contrib/gms_tcp/gms_tcp_test/gms_tcp_server.class create mode 100644 contrib/gms_tcp/gms_tcp_test/gms_tcp_server.java create mode 100644 contrib/gms_tcp/input/gms_tcp_server.source create mode 100644 contrib/gms_tcp/output/gms_tcp_server.source create mode 100644 contrib/gms_tcp/parallel_schedule create mode 100644 contrib/gms_tcp/sql/gms_tcp.sql create mode 100644 contrib/gms_tcp/sql/gms_tcp_client.sql diff --git a/GNUmakefile.in b/GNUmakefile.in index 6e9b64af8..7b904a345 100644 --- a/GNUmakefile.in +++ b/GNUmakefile.in @@ -105,6 +105,7 @@ install: @if test -d contrib/datavec; then $(MAKE) -C contrib/datavec clean; fi @if test -d contrib/datavec; then $(MAKE) -C contrib/datavec $@; fi @if test -d contrib/gms_stats; then $(MAKE) -C contrib/gms_stats $@; fi + @if test -d contrib/gms_tcp; then $(MAKE) -C contrib/gms_tcp $@; fi @if test -d contrib/gms_profiler; then $(MAKE) -C contrib/gms_profiler $@; fi @if test -d contrib/gms_output; then $(MAKE) -C contrib/gms_output $@; fi @if test -d contrib/timescaledb; then (./contrib/timescaledb/run_to_build.sh && $(MAKE) -C contrib/timescaledb/build $@); fi diff --git a/build/script/aarch64_opengauss_list b/build/script/aarch64_opengauss_list index 96e9c684e..0bbf5b388 100644 --- a/build/script/aarch64_opengauss_list +++ b/build/script/aarch64_opengauss_list @@ -127,6 +127,8 @@ ./share/postgresql/extension/gms_lob.control ./share/postgresql/extension/gms_stats--1.0.sql ./share/postgresql/extension/gms_stats.control +./share/postgresql/extension/gms_tcp--1.0.sql +./share/postgresql/extension/gms_tcp.control ./share/postgresql/extension/gms_profiler--1.0.sql ./share/postgresql/extension/gms_profiler.control ./share/postgresql/extension/gms_sql--1.0.sql @@ -833,6 +835,7 @@ ./lib/postgresql/gms_output.so ./lib/postgresql/gms_lob.so ./lib/postgresql/gms_stats.so +./lib/postgresql/gms_tcp.so ./lib/postgresql/gms_profiler.so ./lib/postgresql/gms_sql.so ./lib/libpljava.so diff --git a/build/script/opengauss_release_list_ubuntu_single b/build/script/opengauss_release_list_ubuntu_single index 1ac3feb9c..7f37aa985 100644 --- a/build/script/opengauss_release_list_ubuntu_single +++ b/build/script/opengauss_release_list_ubuntu_single @@ -119,6 +119,8 @@ ./share/postgresql/extension/gms_profiler.control ./share/postgresql/extension/gms_sql--1.0.sql ./share/postgresql/extension/gms_sql.control +./share/postgresql/extension/gms_tcp--1.0.sql +./share/postgresql/extension/gms_tcp.control ./share/postgresql/timezone/GB-Eire ./share/postgresql/timezone/Turkey ./share/postgresql/timezone/Kwajalein @@ -805,6 +807,7 @@ ./lib/postgresql/gms_stats.so ./lib/postgresql/gms_profiler.so ./lib/postgresql/gms_sql.so +./lib/postgresql/gms_tcp.so ./lib/libpljava.so ./lib/libpq.a ./lib/libpq.so diff --git a/build/script/x86_64_opengauss_list b/build/script/x86_64_opengauss_list index b3765a005..2807efe99 100644 --- a/build/script/x86_64_opengauss_list +++ b/build/script/x86_64_opengauss_list @@ -127,6 +127,8 @@ ./share/postgresql/extension/gms_lob.control ./share/postgresql/extension/gms_stats--1.0.sql ./share/postgresql/extension/gms_stats.control +./share/postgresql/extension/gms_tcp--1.0.sql +./share/postgresql/extension/gms_tcp.control ./share/postgresql/extension/gms_profiler--1.0.sql ./share/postgresql/extension/gms_profiler.control ./share/postgresql/extension/gms_sql--1.0.sql @@ -830,6 +832,7 @@ ./lib/postgresql/dblink.so ./lib/postgresql/gms_lob.so ./lib/postgresql/gms_stats.so +./lib/postgresql/gms_tcp.so ./lib/postgresql/pgoutput.so ./lib/postgresql/assessment.so ./lib/postgresql/gms_output.so diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c3acfb695..5ae029022 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -31,6 +31,7 @@ set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/gms_profiler ${CMAKE_CURRENT_SOURCE_DIR}/gms_lob ${CMAKE_CURRENT_SOURCE_DIR}/gms_sql + ${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp ) add_subdirectory(hstore) @@ -51,6 +52,7 @@ add_subdirectory(file_fdw) add_subdirectory(log_fdw) add_subdirectory(gms_stats) add_subdirectory(gms_sql) +add_subdirectory(gms_tcp) if("${ENABLE_MULTIPLE_NODES}" STREQUAL "OFF") add_subdirectory(gc_fdw) endif() diff --git a/contrib/gms_tcp/CMakeLists.txt b/contrib/gms_tcp/CMakeLists.txt new file mode 100644 index 000000000..c51bbeee5 --- /dev/null +++ b/contrib/gms_tcp/CMakeLists.txt @@ -0,0 +1,21 @@ +#This is the main CMAKE for build all gms_tcp. +# gms_tcp +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} TGT_gms_tcp_SRC) +set(TGT_gms_tcp_INC + ${PROJECT_OPENGS_DIR}/contrib/gms_tcp + ${PROJECT_OPENGS_DIR}/contrib +) + +set(gms_tcp_DEF_OPTIONS ${MACRO_OPTIONS}) +set(gms_tcp_COMPILE_OPTIONS ${OPTIMIZE_OPTIONS} ${OS_OPTIONS} ${PROTECT_OPTIONS} ${WARNING_OPTIONS} ${LIB_SECURE_OPTIONS} ${CHECK_OPTIONS}) +set(gms_tcp_LINK_OPTIONS ${LIB_LINK_OPTIONS}) +add_shared_libtarget(gms_tcp TGT_gms_tcp_SRC TGT_gms_tcp_INC "${gms_tcp_DEF_OPTIONS}" "${gms_tcp_COMPILE_OPTIONS}" "${gms_tcp_LINK_OPTIONS}") +set_target_properties(gms_tcp PROPERTIES PREFIX "") + +install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp.control + DESTINATION share/postgresql/extension/ +) +install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gms_tcp--1.0.sql + DESTINATION share/postgresql/extension/ +) +install(TARGETS gms_tcp DESTINATION lib/postgresql) diff --git a/contrib/gms_tcp/Makefile b/contrib/gms_tcp/Makefile new file mode 100644 index 000000000..de182ac56 --- /dev/null +++ b/contrib/gms_tcp/Makefile @@ -0,0 +1,29 @@ +# contrib/gms_tcp/Makefile +MODULE_big = gms_tcp +OBJS = gms_tcp.o + +EXTENSION = gms_tcp +DATA = gms_tcp--1.0.sql + +exclude_option = -fPIE +override CPPFLAGS := -fstack-protector-strong $(filter-out $(exclude_option),$(CPPFLAGS)) + +REGRESS = gms_tcp + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/gms_tcp +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +regress_home = $(top_builddir)/src/test/regress +REGRESS_OPTS = -c 0 -d 1 -r 1 -p 25632 --single_node -w --keep_last_data=false \ + --regconf=$(regress_home)/regress.conf \ + --schedule=parallel_schedule \ + --temp-config=$(regress_home)/make_fastcheck_postgresql.conf +include $(top_srcdir)/contrib/contrib-global.mk +endif + +gms_tcp.o: gms_tcp.cpp diff --git a/contrib/gms_tcp/data/dummy.txt b/contrib/gms_tcp/data/dummy.txt new file mode 100644 index 000000000..8e09a4f6c --- /dev/null +++ b/contrib/gms_tcp/data/dummy.txt @@ -0,0 +1 @@ +The openGauss regression needs this file to run. diff --git a/contrib/gms_tcp/expected/gms_tcp.out b/contrib/gms_tcp/expected/gms_tcp.out new file mode 100644 index 000000000..ba0cd066e --- /dev/null +++ b/contrib/gms_tcp/expected/gms_tcp.out @@ -0,0 +1,13 @@ +\dx + List of installed extensions + Name | Version | Schema | Description +-----------------+---------+------------+------------------------------------------------------------ + dist_fdw | 1.0 | pg_catalog | foreign-data wrapper for distfs access + file_fdw | 1.0 | pg_catalog | foreign-data wrapper for flat file access + gms_tcp | 1.0 | public | provides TCP/IP client=side access functionality in PL/SQL + hstore | 1.1 | pg_catalog | data type for storing sets of (key, value) pairs + log_fdw | 1.0 | pg_catalog | Foreign Data Wrapper for accessing logging data + plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language + security_plugin | 1.0 | pg_catalog | provides security functionality +(7 rows) + diff --git a/contrib/gms_tcp/expected/gms_tcp_client.out b/contrib/gms_tcp/expected/gms_tcp_client.out new file mode 100644 index 000000000..478667a58 --- /dev/null +++ b/contrib/gms_tcp/expected/gms_tcp_client.out @@ -0,0 +1,714 @@ +-- +--test in_buffer +-- +create extension gms_tcp; +create or replace function gms_tcp_test_in_buffer() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'in buffer'); + pg_sleep(1); + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 5; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 12; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 13; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 12; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 5; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 17; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 9; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 11; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +-- +--test read data +-- +--get line +create or replace function gms_tcp_test_get_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get line'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--get text +create or replace function gms_tcp_test_get_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get text'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_text(c, 17, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--get raw +create or replace function gms_tcp_test_get_raw() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data raw; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get raw'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_raw(c, 4, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_raw(c, 8); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--read line +create or replace function gms_tcp_test_read_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read line'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--read text +create or replace function gms_tcp_test_read_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; + out_len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read text'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 18; + gms_tcp.read_text(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +create or replace function gms_tcp_test_read_raw() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data raw; + len integer; + out_len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read raw'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 3; + gms_tcp.read_raw(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 4; + gms_tcp.read_raw(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 8; + gms_tcp.read_raw(c, data, len, out_len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--write line +create or replace function gms_tcp_test_write_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + newline=>'LF', + tx_timeout=>10); + num = gms_tcp.write_line(c, 'write line'); + pg_sleep(1); + num = gms_tcp.write_line(c, '0123456789'); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +--write text +create or replace function gms_tcp_test_write_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + --out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_text(c, 'write text', 10); + pg_sleep(1); + num = gms_tcp.write_text(c, '0123456789', 6); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +create or replace function gms_tcp_test_error_in_buffer_size() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>40480, + out_buffer_size=>20480, + newline=>'lf', + tx_timeout=>10); + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +create or replace function gms_tcp_test_error_out_buffer_size() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>40480, + newline=>'lf', + tx_timeout=>10); + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +-- +--char_set +-- +create or replace function gms_tcp_test_char_set() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + cset=>'gbk', + tx_timeout=>10); + num = gms_tcp.write_line(c, 'char set'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +create or replace function gms_tcp_test_quit() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'quit'); + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; +select pg_sleep(5); + pg_sleep +---------- + +(1 row) + +select gms_tcp_test_in_buffer(); +INFO: available: 8, rcv: aaaab(5). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 11, rcv: bbbccccdddd(12). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: eeeeffff(13). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: gggghhhh(12). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: 01234(5). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 11, rcv: 567aaaabbbb(17). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: ccccdddd(9). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: eeeeffff(11). +CONTEXT: referenced column: gms_tcp_test_in_buffer +INFO: available: 8, rcv: gggghhhh. +CONTEXT: referenced column: gms_tcp_test_in_buffer + gms_tcp_test_in_buffer +------------------------ + +(1 row) + +select gms_tcp_test_get_line(); +INFO: available: 28, rcv: get line, abcdefg1234567890 +. +CONTEXT: referenced column: gms_tcp_test_get_line + gms_tcp_test_get_line +----------------------- + +(1 row) + +select gms_tcp_test_get_text(); +INFO: available: 28, rcv: get text, abcdefg. +CONTEXT: referenced column: gms_tcp_test_get_text + gms_tcp_test_get_text +----------------------- + +(1 row) + +select gms_tcp_test_get_raw(); +INFO: available: 10, rcv: 01020304. +CONTEXT: referenced column: gms_tcp_test_get_raw +INFO: available: 10, rcv: 0102030405060708. +CONTEXT: referenced column: gms_tcp_test_get_raw + gms_tcp_test_get_raw +---------------------- + +(1 row) + +select gms_tcp_test_read_line(); +INFO: available: 29, rcv: read line, abcdefg1234567890 +(29). +CONTEXT: referenced column: gms_tcp_test_read_line + gms_tcp_test_read_line +------------------------ + +(1 row) + +select gms_tcp_test_read_text(); +INFO: available: 29, rcv: read text, abcdefg(18). +CONTEXT: referenced column: gms_tcp_test_read_text + gms_tcp_test_read_text +------------------------ + +(1 row) + +select gms_tcp_test_read_raw(); +INFO: available: 10, rcv: 010203(6). +CONTEXT: referenced column: gms_tcp_test_read_raw +INFO: available: 10, rcv: 01020304(8). +CONTEXT: referenced column: gms_tcp_test_read_raw +INFO: available: 10, rcv: 0102030405060708(16). +CONTEXT: referenced column: gms_tcp_test_read_raw + gms_tcp_test_read_raw +----------------------- + +(1 row) + +select gms_tcp_test_write_line(); + gms_tcp_test_write_line +------------------------- + +(1 row) + +select gms_tcp_test_write_text(); + gms_tcp_test_write_text +------------------------- + +(1 row) + +select gms_tcp_test_error_in_buffer_size(); +INFO: caught gms_tcp_bad_argument +CONTEXT: referenced column: gms_tcp_test_error_in_buffer_size + gms_tcp_test_error_in_buffer_size +----------------------------------- + +(1 row) + +select gms_tcp_test_error_out_buffer_size(); +INFO: caught gms_tcp_bad_argument +CONTEXT: referenced column: gms_tcp_test_error_out_buffer_size + gms_tcp_test_error_out_buffer_size +------------------------------------ + +(1 row) + +select gms_tcp_test_char_set(); +INFO: available: 7, rcv: abcdefg. +CONTEXT: referenced column: gms_tcp_test_char_set + gms_tcp_test_char_set +----------------------- + +(1 row) + +select gms_tcp_test_quit(); + gms_tcp_test_quit +------------------- + +(1 row) + +drop function gms_tcp_test_in_buffer(); +drop function gms_tcp_test_get_line(); +drop function gms_tcp_test_get_text(); +drop function gms_tcp_test_get_raw(); +drop function gms_tcp_test_read_line(); +drop function gms_tcp_test_read_text(); +drop function gms_tcp_test_read_raw(); +drop function gms_tcp_test_write_line(); +drop function gms_tcp_test_write_text(); +drop function gms_tcp_test_error_in_buffer_size(); +drop function gms_tcp_test_error_out_buffer_size(); +drop function gms_tcp_test_char_set(); +drop function gms_tcp_test_quit(); diff --git a/contrib/gms_tcp/gms_tcp--1.0.sql b/contrib/gms_tcp/gms_tcp--1.0.sql new file mode 100644 index 000000000..e36bff79f --- /dev/null +++ b/contrib/gms_tcp/gms_tcp--1.0.sql @@ -0,0 +1,330 @@ +-- +-- create schema +-- +CREATE SCHEMA gms_tcp; + +-- +-- create type connection +-- +CREATE TYPE gms_tcp.connection; + +CREATE FUNCTION gms_tcp.connection_in(cstring) +RETURNS gms_tcp.connection +AS 'MODULE_PATHNAME','gms_tcp_connection_in' +LANGUAGE C STRICT NOT FENCED; + +CREATE FUNCTION gms_tcp.connection_out(gms_tcp.connection) +RETURNS cstring +AS 'MODULE_PATHNAME','gms_tcp_connection_out' +LANGUAGE C STRICT NOT FENCED; + +CREATE TYPE gms_tcp.connection( + internallength = 512, + input = gms_tcp.connection_in, + output = gms_tcp.connection_out +); + +-- +-- function: crlf +-- +CREATE FUNCTION gms_tcp.crlf() +RETURNS varchar2 +AS 'MODULE_PATHNAME','gms_tcp_crlf' +LANGUAGE C STRICT NOT FENCED; + +-- +-- function: available +-- determines the number of bytes available for reading from a tcp/ip connection. +-- +CREATE FUNCTION gms_tcp.available_real(c in gms_tcp.connection, + timeout in int) +RETURNS integer +AS 'MODULE_PATHNAME','gms_tcp_available_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.available(c in gms_tcp.connection, + timeout in int default 0) + returns integer + language plpgsql +as $function$ +begin + return gms_tcp.available_real(c, timeout); +end; +$function$; + +-- +-- funciton: close_all_connections +-- +CREATE FUNCTION gms_tcp.close_all_connections() +RETURNS void +AS 'MODULE_PATHNAME','gms_tcp_close_all_connections' +LANGUAGE C STRICT NOT FENCED; + +-- +-- funciton: close_connection +-- +CREATE FUNCTION gms_tcp.close_connection(c in gms_tcp.connection) +RETURNS void +AS 'MODULE_PATHNAME','gms_tcp_close_connection' +LANGUAGE C STRICT NOT FENCED; + +-- +-- function: flush +-- transmits immediately to the server all data in the output buffer, if a buffer is used. +-- +CREATE FUNCTION gms_tcp.flush(c in gms_tcp.connection) +RETURNS void +AS 'MODULE_PATHNAME','gms_tcp_flush' +LANGUAGE C STRICT NOT FENCED; + +-- +-- function: get_line +-- +CREATE FUNCTION gms_tcp.get_line_real(c in gms_tcp.connection, + remove_crlf in boolean, + peek in boolean, + ch_charset in boolean default false) +RETURNS text +AS 'MODULE_PATHNAME','gms_tcp_get_line_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.get_line(c in gms_tcp.connection, + remove_crlf in boolean default false, + peek in boolean default false) + returns text + language plpgsql +as $function$ +begin + return gms_tcp.get_line_real(c, remove_crlf, peek, false); +end; +$function$; + +-- +-- function: get_raw +-- +CREATE FUNCTION gms_tcp.get_raw_real(c in gms_tcp.connection, + len in integer, + peek in boolean) +RETURNS raw +AS 'MODULE_PATHNAME','gms_tcp_get_raw_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.get_raw(c in gms_tcp.connection, + len in integer default 1, + peek in boolean default false) + returns raw + language plpgsql +as $function$ +begin + return gms_tcp.get_raw_real(c, len, peek); +end; +$function$; + +-- +-- function: get_text +-- +CREATE FUNCTION gms_tcp.get_text_real(c in gms_tcp.connection, + len in integer, + peek in boolean, + ch_charset in boolean default false) +RETURNS text +AS 'MODULE_PATHNAME','gms_tcp_get_text_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.get_text(c in gms_tcp.connection, + len in integer default 1, + peek in boolean default false) + returns text + language plpgsql +as $function$ +begin + return gms_tcp.get_text_real(c, len, peek, false); +end; +$function$; + +-- +-- function: open_connection +-- +CREATE FUNCTION gms_tcp.open_connection_real(remote_host in varchar2, + remote_port in integer, + local_host in varchar2 default 0, + local_port in integer default 0, + in_buffer_size in integer default 0, + out_buffer_size in integer default 0, + cset in varchar2 default 0, + newline in varchar2 default 'CRLF', + tx_timeout in integer default 2147483647) +RETURNS gms_tcp.connection +AS 'MODULE_PATHNAME','gms_tcp_open_connection' +LANGUAGE C STRICT NOT FENCED; + +CREATE FUNCTION gms_tcp.open_connection(remote_host in varchar2, + remote_port in integer, + local_host in varchar2 default null, + local_port in integer default null, + in_buffer_size in integer default null, + out_buffer_size in integer default null, + cset in varchar2 default null, + newline in varchar2 default 'CRLF', + tx_timeout in integer default null) +RETURNS gms_tcp.connection +as $$ +declare + local_host_tmp varchar2; + local_port_tmp integer; + in_buffer_size_tmp integer; + out_buffer_size_tmp integer; + cset_tmp varchar2; + newline_tmp varchar2; + tx_timeout_tmp integer; +begin + if remote_host is null or remote_port is null then + raise exception 'error input, remote_host or remote_port is null'; + end if; + + if local_host is null then + local_host_tmp = 0; + else + local_host_tmp = local_host; + end if; + + if local_port is null then + local_port_tmp = 0; + else + local_port_tmp = local_port; + end if; + + if in_buffer_size is null then + in_buffer_size_tmp = 0; + else + in_buffer_size_tmp = in_buffer_size; + end if; + + if out_buffer_size is null then + out_buffer_size_tmp = 0; + else + out_buffer_size_tmp = out_buffer_size; + end if; + + if cset is null then + cset_tmp = 0; + else + cset_tmp = cset; + end if; + + if newline is null then + newline_tmp = 'CRLF'; + else + newline_tmp = newline; + end if; + + if tx_timeout is null then + tx_timeout_tmp = 2147483647; + else + tx_timeout_tmp = tx_timeout; + end if; + + return gms_tcp.open_connection_real(remote_host, + remote_port, + local_host_tmp, + local_port_tmp, + in_buffer_size_tmp, + out_buffer_size_tmp, + cset_tmp, + newline_tmp, + tx_timeout_tmp); +end; +$$ LANGUAGE plpgsql; + +-- +-- function: read_line +-- +CREATE OR REPLACE PROCEDURE gms_tcp.read_line(c in gms_tcp.connection, + data out varchar2, + len out integer, + remove_crlf in boolean default false, + peek in boolean default false) +as +begin + data = gms_tcp.get_line_real(c, remove_crlf, peek, true); + len = length(data); +end; + +-- +-- function: read_raw +-- +CREATE OR REPLACE PROCEDURE gms_tcp.read_raw(c in gms_tcp.connection, + data out raw, + data_len out integer, + len in integer default 1, + peek in boolean default false) +as +begin + data = gms_tcp.get_raw_real(c, len, peek); + data_len = length(data); +end; + +-- +-- function: read_text +-- +CREATE OR REPLACE PROCEDURE gms_tcp.read_text(c in gms_tcp.connection, + data out varchar2, + data_len out integer, + len in integer default 1, + peek in boolean default false) +as +begin + data = gms_tcp.get_text_real(c, len, peek, true); + data_len = length(data); +end; + +-- +-- function: write_line +-- +CREATE FUNCTION gms_tcp.write_line(c in gms_tcp.connection, + data in varchar2) +RETURNS integer +AS 'MODULE_PATHNAME','gms_tcp_write_line' +LANGUAGE C STRICT NOT FENCED; + +-- +-- function: write_raw +-- +CREATE FUNCTION gms_tcp.write_raw_real(c in gms_tcp.connection, + data in raw, + len in integer) +RETURNS integer +AS 'MODULE_PATHNAME','gms_tcp_write_raw_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.write_raw(c in gms_tcp.connection, + data in raw, + len in integer default 0) + returns integer + language plpgsql +as $function$ +begin + return gms_tcp.write_raw_real(c, data, len); +end; +$function$; + +-- +-- function: write_text +-- +CREATE FUNCTION gms_tcp.write_text_real(c in gms_tcp.connection, + data in varchar2, + len in integer default null) +RETURNS integer +AS 'MODULE_PATHNAME','gms_tcp_write_text_real' +LANGUAGE C STRICT NOT FENCED; + +create or replace function gms_tcp.write_text(c in gms_tcp.connection, + data in varchar2, + len in integer default 0) + returns integer + language plpgsql +as $function$ +begin + return gms_tcp.write_text_real(c, data, len); +end; +$function$; diff --git a/contrib/gms_tcp/gms_tcp.control b/contrib/gms_tcp/gms_tcp.control new file mode 100644 index 000000000..a83eca285 --- /dev/null +++ b/contrib/gms_tcp/gms_tcp.control @@ -0,0 +1,5 @@ +# gms_tcp extension +comment = 'provides TCP/IP client=side access functionality in PL/SQL' +default_version = '1.0' +module_pathname = '$libdir/gms_tcp' +relocatable = true diff --git a/contrib/gms_tcp/gms_tcp.cpp b/contrib/gms_tcp/gms_tcp.cpp new file mode 100644 index 000000000..c65a5e197 --- /dev/null +++ b/contrib/gms_tcp/gms_tcp.cpp @@ -0,0 +1,1499 @@ +/* + * This code implements the functons of gms_tcp + */ + +#include "gms_tcp.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(gms_tcp_crlf); +PG_FUNCTION_INFO_V1(gms_tcp_available_real); +PG_FUNCTION_INFO_V1(gms_tcp_close_all_connections); +PG_FUNCTION_INFO_V1(gms_tcp_close_connection); +PG_FUNCTION_INFO_V1(gms_tcp_flush); +PG_FUNCTION_INFO_V1(gms_tcp_get_line_real); +PG_FUNCTION_INFO_V1(gms_tcp_get_raw_real); +PG_FUNCTION_INFO_V1(gms_tcp_get_text_real); +PG_FUNCTION_INFO_V1(gms_tcp_open_connection); +PG_FUNCTION_INFO_V1(gms_tcp_write_line); +PG_FUNCTION_INFO_V1(gms_tcp_write_raw_real); +PG_FUNCTION_INFO_V1(gms_tcp_write_text_real); +PG_FUNCTION_INFO_V1(gms_tcp_connection_in); +PG_FUNCTION_INFO_V1(gms_tcp_connection_out); + +static void *gms_tcp_alloc(int len); +static inline List *gms_tcp_lappend(List* list, void* datum); +static inline void gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size); +static bool gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr); +static bool gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr); +static int gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state); +static void gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state); +static GMS_TCP_CONNECTION_STATE *gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c); +static void gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len); +static int32 gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len); +static void gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer); +static void gms_tcp_release_connection_state(std::shared_ptr data); +static void gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len); +static int32 gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); +static void gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state); +static int32 gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state); +static inline int gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout); +static inline int gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout); +static void gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err); +static void gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); +static bool gms_tcp_check_charset(char *charset); +static inline int32 gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer); +static char *gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write); +static char *gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data); +static char *gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data); +static void gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); +static int32 gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); +static void gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state); + +static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset); +static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset); +static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len); +static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2); + + +static void * +gms_tcp_alloc(int len) +{ + void *result = NULL; + + MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); + result = palloc0(len); + MemoryContextSwitchTo(old_context); + + return result; +} + +static inline List * +gms_tcp_lappend(List* list, void* datum) +{ + List *result = NULL; + + MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); + result = lappend(list, datum); + MemoryContextSwitchTo(old_context); + + return result; +} + +static inline List * +gms_tcp_lappend_int(List* list, int datum) +{ + List *result = NULL; + + MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); + result = lappend_int(list, datum); + MemoryContextSwitchTo(old_context); + + return result; +} + +void +gms_tcp_get_connection_info(GMS_TCP_CONNECTION *c, GMS_TCP_CONNECTION_INFO *c_info) +{ + GMS_TCP_CONNECTION_HEAD *c_h = &c->c_h; + int offset = 0; + + if (c_h->remote_host_len) { + c_info->remote_host = (char *)gms_tcp_alloc(c_h->remote_host_len); + gms_tcp_comm_memcopy_str(c_info->remote_host, c_h->remote_host_len, c->data + offset, &offset); + } + if (c_h->remote_port_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->remote_port, c_h->remote_port_len, c->data + offset, c_h->remote_port_len, &offset); + } + if (c_h->local_host_len) { + c_info->local_host = (char *)gms_tcp_alloc(c_h->local_host_len); + gms_tcp_comm_memcopy_str(c_info->local_host, c_h->local_host_len, c->data + offset, &offset); + } + if (c_h->local_port_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->local_port, c_h->local_port_len, c->data + offset, c_h->local_port_len, &offset); + } + if (c_h->in_buffer_size_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->in_buffer_size, c_h->in_buffer_size_len, c->data + offset, c_h->in_buffer_size_len, &offset); + } + if (c_h->out_buffer_size_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->out_buffer_size, c_h->out_buffer_size_len, c->data + offset, c_h->out_buffer_size_len, &offset); + } + if (c_h->charset_len) { + c_info->charset = (char *)gms_tcp_alloc(c_h->charset_len); + gms_tcp_comm_memcopy_str(c_info->charset, c_h->charset_len, c->data + offset, &offset); + } + if (c_h->newline_len) { + c_info->newline = (char *)gms_tcp_alloc(c_h->newline_len); + gms_tcp_comm_memcopy_str(c_info->newline, c_h->newline_len, c->data + offset, &offset); + } + if (c_h->tx_timeout_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->tx_timeout, c_h->tx_timeout_len, c->data + offset, c_h->tx_timeout_len, &offset); + } + if (c_h->fd_len) { + gms_tcp_comm_memcopy_int((char *)&c_info->fd, c_h->fd_len, c->data + offset, c_h->fd_len, &offset); + } + + return; +} + +void +gms_tcp_get_connection_head_by_info(GMS_TCP_CONNECTION_INFO *c_info, GMS_TCP_CONNECTION_HEAD *c_h) +{ + if (c_info->remote_host) { + c_h->remote_host_len = strlen(c_info->remote_host) + 1; + c_h->total_len += c_h->remote_host_len; + } + if (c_info->remote_port) { + c_h->remote_port_len = sizeof(c_info->remote_port); + c_h->total_len += c_h->remote_port_len; + } + if (c_info->local_host) { + c_h->local_host_len = strlen(c_info->local_host) + 1; + c_h->total_len += c_h->local_host_len; + } + if (c_info->local_port) { + c_h->local_port_len = sizeof(c_info->local_port); + c_h->total_len += c_h->local_port_len; + } + if (c_info->in_buffer_size) { + c_h->in_buffer_size_len = sizeof(c_info->in_buffer_size); + c_h->total_len += c_h->in_buffer_size_len; + } + if (c_info->out_buffer_size) { + c_h->out_buffer_size_len = sizeof(c_info->out_buffer_size); + c_h->total_len += c_h->out_buffer_size_len; + } + if (c_info->charset) { + c_h->charset_len = strlen(c_info->charset) + 1; + c_h->total_len += c_h->charset_len; + } + if (c_info->newline) { + c_h->newline_len = strlen(c_info->newline) + 1; + c_h->total_len += c_h->newline_len; + } + if (c_info->tx_timeout) { + c_h->tx_timeout_len = sizeof(c_info->tx_timeout); + c_h->total_len += c_h->tx_timeout_len; + } + if (c_info->fd) { + c_h->fd_len = sizeof(c_info->fd); + c_h->total_len += c_h->fd_len; + } + + return; +} + +static inline void +gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size) +{ + errno_t rc = EOK; + rc = memset_s(c_buffer, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, sizeof(GMS_TCP_CONNECTION_BUFFER)); + securec_check_c(rc, "\0", "\0"); + if (buffer_size) { + c_buffer->max_buffer_size = buffer_size; + c_buffer->free_space = buffer_size; + c_buffer->data = (char *)gms_tcp_alloc(buffer_size); + } +} + +static bool +gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr) +{ + struct hostent *hptr = gethostbyname(remotehost); + if (!hptr) { + return false; + } + + if (hptr->h_addrtype == AF_INET) { + saddr->sin_addr = *((struct in_addr *)hptr->h_addr_list[0]); + return true; + } + + return false; +} + +static bool +gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr) +{ + int ret = 0; + + ret = inet_aton(remotehost, &saddr->sin_addr); + if (!ret) { + return gms_tcp_get_addr_by_hostname(remotehost, saddr); + } + + return true; +} + +static int +gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state) +{ + c_state->fd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in saddr; + int res = 0; + errno_t rc = EOK; + + rc = memset_s(&saddr, GMS_TCP_MAX_IN_BUFFER_SIZE, 0,sizeof(saddr)); + securec_check_c(rc, "\0", "\0"); + saddr.sin_family = AF_INET; + saddr.sin_port = htons(c_state->c_info.remote_port); + saddr.sin_addr = c_state->c_info.saddr.sin_addr; + + res = connect(c_state->fd, (struct sockaddr *)&saddr, sizeof(saddr)); + if (res) { + c_state->fd = 0; + return GMS_TCP_CONNECT_FAIL; + } + + c_state->state = GMS_TCP_CONNECT_OK; + + return GMS_TCP_OK; +} + +static void +gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state) +{ + PortalSpecialData *node; + node = (PortalSpecialData *)gms_tcp_alloc(sizeof(PortalSpecialData)); + node->data = std::shared_ptr(c_state); + node->cleanup = gms_tcp_release_connection_state; + GMS_TCP_FD_BAK_LIST = gms_tcp_lappend(GMS_TCP_FD_BAK_LIST, node); +} + +static GMS_TCP_CONNECTION_STATE * +gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c) +{ + ListCell *lc = NULL; + GMS_TCP_CONNECTION_INFO *c_info = NULL; + GMS_TCP_CONNECTION_STATE *result = NULL; + + c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO)); + gms_tcp_get_connection_info(c, c_info); + + foreach (lc, GMS_TCP_FD_BAK_LIST) { + PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); + GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(node->data).get(); + + if (c_info->fd == c_state->fd) { + result = c_state; + break; + } + } + + gms_tcp_release_connection_info(c_info); + pfree(c_info); + + return result; +} + +GMS_TCP_CONNECTION_STATE * +gms_tcp_get_connection_state_by_fd(int fd) +{ + ListCell *lc = NULL; + GMS_TCP_CONNECTION_STATE *result = NULL; + + foreach (lc, GMS_TCP_FD_BAK_LIST) { + PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); + GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(node->data).get(); + + if (fd == c_state->fd) { + result = c_state; + break; + } + } + + return result; +} + +static void +gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len) +{ + GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; + int32 left_len = 0; + error_t errorno = EOK; + + if (in_buffer->free_space <= data_len) { + gms_tcp_close_connection_by_state(c_state); + ereport(ERROR, + (errcode(ERRCODE_BUFFER_TOO_SMALL), + errmsg("input bufer is full."))); + } + + if (in_buffer->end > in_buffer->start) { + /* |----start data end----| */ + left_len = in_buffer->max_buffer_size - in_buffer->end; + if (left_len > data_len) { + errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, data_len); + securec_check_c(errorno, "\0", "\0"); + in_buffer->end += data_len; + } else { + errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, left_len); + securec_check_c(errorno, "\0", "\0"); + errorno = memcpy_s(in_buffer->data, in_buffer->start, &data_in[left_len], (data_len - left_len)); + securec_check_c(errorno, "\0", "\0"); + in_buffer->end = (data_len - left_len); + } + } else if (in_buffer->start > in_buffer->end){ + /* | data end----start data | */ + errorno = memcpy_s(&in_buffer->data[in_buffer->end], in_buffer->start - in_buffer->end, data_in, data_len); + securec_check_c(errorno, "\0", "\0"); + in_buffer->end += data_len; + } else { + /* in buffer is empty, reset it. */ + errorno = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, in_buffer->max_buffer_size); + securec_check(errorno,"\0","\0"); + in_buffer->free_space = in_buffer->max_buffer_size; + in_buffer->start = 0; + in_buffer->end = 0; + errorno = memcpy_s(in_buffer->data, in_buffer->max_buffer_size, data_in, data_len); + securec_check(errorno,"\0","\0"); + in_buffer->end += data_len; + } + + in_buffer->free_space -= data_len; + + return; +} + +static int32 +gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len) +{ + GMS_TCP_CONNECTION_BUFFER *out_buffer = &c_state->out_buffer; + errno_t rc = EOK; + + if (out_buffer->free_space < data_len) { + gms_tcp_close_connection_by_state(c_state); + ereport(ERROR, + (errcode(ERRCODE_BUFFER_TOO_SMALL), + errmsg("output bufer is full."))); + } + + rc = memcpy_s(&out_buffer->data[out_buffer->end], out_buffer->max_buffer_size - out_buffer->end, data_out, data_len); + securec_check_c(rc, "\0", "\0"); + out_buffer->end += data_len; + out_buffer->free_space -= data_len; + + return data_len; +} + +void +gms_tcp_release_connection_info(GMS_TCP_CONNECTION_INFO *c_info) +{ + if (c_info->remote_host) { + pfree(c_info->remote_host); + c_info->remote_host = NULL; + } + + if (c_info->local_host) { + pfree(c_info->local_host); + c_info->local_host = NULL; + } + + if (c_info->charset) { + pfree(c_info->charset); + c_info->charset = NULL; + } + + if (c_info->newline) { + pfree(c_info->newline); + c_info->newline = NULL; + } + +} + +static void +gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer) +{ + if (buffer->data) { + pfree(buffer->data); + buffer->data = NULL; + } + + buffer->free_space = 0; + buffer->start = 0; + buffer->end = 0; +} + +static void +gms_tcp_release_connection_state(std::shared_ptr data) +{ + GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(data).get(); + + if (c_state->fd) { + close(c_state->fd); + } + + gms_tcp_release_connection_buffer(&c_state->in_buffer); + gms_tcp_release_connection_buffer(&c_state->out_buffer); + gms_tcp_release_connection_info(&c_state->c_info); + pfree(c_state); +} + +static void +gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len) +{ + int32 i = 0; + char *newline = NULL; + int newline_len = 0; + errno_t rc = EOK; + + Assert(c_state->c_info.newline); + if (strcmp(c_state->c_info.newline, "CRLF") == 0) { + newline_len = 2; + newline = "\r\n"; + } else { + newline_len = strlen(c_state->c_info.newline); + newline = c_state->c_info.newline; + } + + while (i <= data_len - newline_len) { + if (strncmp(&data[i], newline, newline_len) == 0) { + rc = memset_s(&data[i], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, newline_len); + securec_check_c(rc, "\0", "\0"); + i += newline_len; + + continue; + } + i++; + } +} + +static int32 +gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) +{ + GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL; + errno_t rc = EOK; + + in_buffer = &c_state->in_buffer; + + if (in_buffer->end == in_buffer->start) { + gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt); + } + + /* |----start data end----| */ + if (in_buffer->end > in_buffer->start) { + int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->end - in_buffer->start))) ? + data_opt->len : + (in_buffer->end - in_buffer->start); + rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len); + securec_check_c(rc, "\0", "\0"); + data_opt->data_len = rcv_len; + if (!data_opt->peek) { + rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len); + securec_check_c(rc, "\0", "\0"); + in_buffer->start += rcv_len; + in_buffer->free_space += rcv_len; + } + } else { + /* | data end----start data | */ + int after_start = in_buffer->max_buffer_size - in_buffer->start; + int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->max_buffer_size - in_buffer->free_space))) ? + data_opt->len : + (in_buffer->max_buffer_size - in_buffer->free_space); + if (rcv_len <= after_start) { + rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len); + securec_check_c(rc, "\0", "\0"); + if (!data_opt->peek) { + rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len); + securec_check_c(rc, "\0", "\0"); + in_buffer->start = (in_buffer->start + rcv_len) % in_buffer->max_buffer_size; + in_buffer->free_space += rcv_len; + } + } else { + int32 left_len = rcv_len - after_start; + rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], after_start); + securec_check_c(rc, "\0", "\0"); + data_opt->data_len += after_start; + rc = memcpy_s(&data_opt->data[after_start], GMS_TCP_MAX_IN_BUFFER_SIZE - after_start, in_buffer->data, left_len); + securec_check_c(rc, "\0", "\0"); + data_opt->data_len += left_len; + if (!data_opt->peek) { + rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, after_start); + securec_check_c(rc, "\0", "\0"); + rc = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, left_len); + securec_check_c(rc, "\0", "\0"); + in_buffer->start = left_len; + in_buffer->free_space += rcv_len; + } + } + } + + if (data_opt->remove_crlf) { + gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len); + } + + return data_opt->data_len; +} + +static void +gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state) +{ + ListCell *lc = NULL; + PortalSpecialData *find = NULL; + + Assert(c_state); + Assert(c_state->state == GMS_TCP_CONNECT_OK); + + foreach (lc, GMS_TCP_FD_BAK_LIST) { + PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); + if (c_state == std::static_pointer_cast(node->data).get()) { + find = node; + break; + } + } + + if (find) { + GMS_TCP_FD_BAK_LIST = list_delete_ptr(GMS_TCP_FD_BAK_LIST, find); + find->cleanup(find->data); + pfree(find); + } +} + +static int32 +gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state) +{ + int32 bytes_to_read = 0; + + gms_tcp_wait(c_state, false); + + if (ioctl(c_state->fd, FIONREAD, &bytes_to_read) == 0) { + return bytes_to_read; + } + + return 0; +} + +static inline int +gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout) +{ + int ret; + fd_set rd; + struct timeval timeoutval = {timeout, 0}; + + FD_ZERO(&rd); + FD_SET(c_state->fd, &rd); + + ret = select(c_state->fd + 1, &rd, NULL, NULL, &timeoutval); + return ret; +} + +static inline int +gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout) +{ + int ret; + fd_set wd; + struct timeval timeoutval = {timeout, 0}; + + FD_ZERO(&wd); + FD_SET(c_state->fd, &wd); + + ret = select(c_state->fd + 1, NULL, &wd, NULL, &timeoutval); + return ret; +} + +static void +gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err) +{ + int32 timeout = 0; + + if (c_state->c_info.available_wait) { + /* recv data by available function. */ + if (c_state->c_info.available_timeout) { + timeout = c_state->c_info.available_timeout; + } + } else if (c_state->c_info.get_data_wait) { + /* recv data by get/read function. */ + if (c_state->c_info.tx_timeout) { + timeout = c_state->c_info.tx_timeout; + } + } + + if (timeout) { + int ret = gms_tcp_set_connection_rcv_timeout(c_state, timeout); + if (report_err) { + if (ret == 0) { + ereport(ERROR, + (errcode(ERRCODE_TRANSFER_TIMEOUT), + errmsg("recv data timeout."))); + } else if (ret < 0) { + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + } + } +} + +static void +gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) +{ + GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; + char *data = NULL; + int32 recv_len = 0; + + Assert(in_buffer->max_buffer_size && in_buffer->data); + + data = (char *)palloc0(in_buffer->max_buffer_size); + + gms_tcp_wait(c_state, data_opt->report_err); + + recv_len = recv(c_state->fd, data, in_buffer->max_buffer_size, MSG_DONTWAIT); + if (recv_len > 0) { + char *decrypt_data = NULL; + + if (!data_opt->len) { + decrypt_data = gms_tcp_decrypt_data(c_state, data); + } + + if (decrypt_data) { + gms_tcp_put_data_to_in_buffer(c_state, decrypt_data, strlen(decrypt_data)); + } else { + gms_tcp_put_data_to_in_buffer(c_state, data, recv_len); + } + } else if (data_opt->report_err) { + gms_tcp_close_connection_by_state(c_state); + pfree(data); + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + + pfree(data); + + return; +} + +static bool +gms_tcp_check_charset(char *charset) +{ + const char *client_encoding_name = pg_get_client_encoding_name(); + iconv_t cd; + + cd = iconv_open(charset, client_encoding_name); + if (cd == ((iconv_t)(-1))) { + ereport(WARNING, + (errmsg("Can not change string from %s to %s", client_encoding_name, charset))); + return false; + } + + iconv_close(cd); + + return true; +} + +static inline int32 +gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer) +{ + return in_buffer->max_buffer_size - in_buffer->free_space; +} + +static char * +gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write) +{ + iconv_t cd; + char *data_out = NULL; + size_t data_out_len = GMS_TCP_MAX_OUT_BUFFER_SIZE; + char *data_out_p = NULL; + char *data_in_p = NULL; + const char *client_encoding_name = pg_get_client_encoding_name(); + size_t len = data_opt->len ? data_opt->len : strlen(data_opt->data); + + if (gms_tcp_comm_cmp_str(c_state->c_info.charset, client_encoding_name)) { + return NULL; + } + + if (is_write) { + cd = iconv_open(c_state->c_info.charset, client_encoding_name); + } else { + cd = iconv_open(client_encoding_name, c_state->c_info.charset); + } + + if (cd < 0) { + return NULL; + } + + data_out = (char *)palloc0(data_out_len); + data_in_p = data_opt->data; + data_out_p = data_out; + if(iconv(cd, &data_in_p, &len, &data_out_p, &data_out_len) < 0){ + iconv_close(cd); + pfree(data_out); + return NULL; + } + + iconv_close(cd); + return data_out; +} + +static char * +gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data) +{ + /* Do not support encrypt now. */ + return NULL; +} + +static char * +gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data) +{ + /* Do not support decrypt now. */ + return NULL; +} + +static void +gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) +{ + GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL; + errno_t rc = EOK; + + in_buffer = &c_state->in_buffer; + if (in_buffer->max_buffer_size && in_buffer->data) { + /* if in buffer is used, get data from in buffer. */ + data_opt->data_len = gms_tcp_get_data_from_in_buffer(c_state, data_opt); + c_state->c_info.get_data_wait = false; + /* recv data from connection to inbuffer no wait. */ + data_opt->report_err = false; + gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt); + } else { + if (c_state->c_info.tx_timeout) { + int ret = gms_tcp_set_connection_rcv_timeout(c_state, c_state->c_info.tx_timeout); + if (ret == 0) { + ereport(ERROR, + (errcode(ERRCODE_TRANSFER_TIMEOUT), + errmsg("recv data timeout."))); + } else if (ret < 0) { + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + } + + data_opt->data_len = recv(c_state->fd, + data_opt->data, + data_opt->len ? data_opt->len : GMS_TCP_MAX_IN_BUFFER_SIZE, + MSG_DONTWAIT | (data_opt->peek ? MSG_PEEK : 0)); + if (data_opt->data_len <= 0) { + gms_tcp_close_connection_by_state(c_state); + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + + /* if the data is encrypted, we must get the hold data. */ + if (!data_opt->len) { + char *decrypt_data = gms_tcp_decrypt_data(c_state, data_opt->data); + if (decrypt_data) { + rc = memset_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, GMS_TCP_MAX_IN_BUFFER_SIZE); + securec_check_c(rc, "\0", "\0"); + rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, decrypt_data, strlen(decrypt_data)); + securec_check_c(rc, "\0", "\0"); + data_opt->data_len = strlen(decrypt_data); + } + } + + if (data_opt->remove_crlf) { + gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len); + } + } +} + +static int32 +gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) +{ + char *data = NULL; + int32 data_len = 0; + char *encrypt_data = NULL; + errno_t rc = EOK; + + data_len = data_opt->len ? data_opt->len : strlen(data_opt->data); + if (c_state->out_buffer.data) { + bool done = false; + if (data_opt->ch_charset && c_state->c_info.charset) { + + /* change charset, if data is not NULL, send data, or send data_opt->data. */ + data = gms_tcp_encode_data_by_charset(c_state, data_opt, true); + if (data) { + /* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */ + if (data_opt->encrypt) { + encrypt_data = gms_tcp_encrypt_data(c_state, data); + } + if (encrypt_data) { + data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data)); + } else { + data_len = gms_tcp_put_data_to_out_buffer(c_state, data, strlen(data)); + } + pfree(data); + done = true; + } + } + if (!done) { + /* + * the charset is not change, try encrypt the data, + * if encrypt_data is not NULL, send encrypt_data, or send data_opt->data. + */ + if (data_opt->encrypt) { + encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data); + } + if (encrypt_data) { + data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data)); + } else { + data_len = gms_tcp_put_data_to_out_buffer(c_state, data_opt->data, data_len); + } + } + } else { + char *data_t = NULL; + + bool done = false; + if (data_opt->ch_charset && c_state->c_info.charset) { + /* change charset, if data is not NULL, send data, or send data_opt->data. */ + char *data = gms_tcp_encode_data_by_charset(c_state, data_opt, true); + if (data) { + /* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */ + if (data_opt->encrypt) { + encrypt_data = gms_tcp_encrypt_data(c_state, data); + } + if (encrypt_data) { + data_t = (char *)palloc0(strlen(encrypt_data)); + rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data)); + securec_check_c(rc, "\0", "\0"); + data_len = strlen(encrypt_data); + } else { + data_t = (char *)palloc0(strlen(data)); + rc = memcpy_s(data_t, strlen(data), data, strlen(data)); + securec_check_c(rc, "\0", "\0"); + data_len = strlen(data); + } + pfree(data); + done = true; + } + } + if (!done) { + /* + * the charset is not change, try encrypt the data, + * if encrypt_data is not NULL, send encrypt_data, or send data_opt->data. + */ + if (data_opt->encrypt) { + encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data); + } + if (encrypt_data) { + data_t = (char *)palloc0(strlen(encrypt_data)); + rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data)); + securec_check_c(rc, "\0", "\0"); + data_len = strlen(encrypt_data); + } else { + data_t = (char *)palloc0(strlen(data_opt->data)); + rc = memcpy_s(data_t, strlen(data_opt->data), data_opt->data, strlen(data_opt->data)); + securec_check_c(rc, "\0", "\0"); + } + } + + if (c_state->c_info.tx_timeout) { + int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout); + if (ret == 0) { + ereport(ERROR, + (errcode(ERRCODE_TRANSFER_TIMEOUT), + errmsg("recv data timeout."))); + } else if (ret < 0) { + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + } + data_len = send(c_state->fd, data_t, data_len, MSG_DONTWAIT); + pfree(data_t); + } + + PG_RETURN_INT32(data_len); +} + +static void +gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state) +{ + if (!c_state) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("get connection fail."))); + } else { + Assert(c_state->state == GMS_TCP_CONNECT_OK); + } +} + +Datum +gms_tcp_crlf(PG_FUNCTION_ARGS) +{ + PG_RETURN_TEXT_P(cstring_to_text("\r\n")); +} + +Datum +gms_tcp_available_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + int32 available_timeout = PG_GETARG_INT32(1); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + int32 available = 0; + + gms_tcp_check_connection_null(c_state); + + if (in_buffer->max_buffer_size && in_buffer->data) { + c_state->c_info.available_wait = true; + c_state->c_info.available_timeout = 0; + data_opt.report_err = false; + gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt); + + available = gms_tcp_get_in_buffer_data_bytes(in_buffer); + if (!available && available_timeout) { + c_state->c_info.available_timeout = available_timeout; + gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt); + available = gms_tcp_get_in_buffer_data_bytes(in_buffer); + } + } else { + c_state->c_info.available_wait = true; + c_state->c_info.available_timeout = available_timeout; + available = gms_tcp_get_available_bytes(c_state); + } + + c_state->c_info.available_wait = false; + c_state->c_info.available_timeout = 0; + + PG_RETURN_INT32(available); +} + +Datum gms_tcp_flush(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_BUFFER *out_buffer = NULL; + errno_t rc = EOK; + + gms_tcp_check_connection_null(c_state); + + out_buffer = &c_state->out_buffer; + if (!out_buffer->data || !out_buffer->max_buffer_size) { + PG_RETURN_VOID(); + } + + if (c_state->c_info.tx_timeout) { + int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout); + if (ret == 0) { + ereport(ERROR, + (errcode(ERRCODE_TRANSFER_TIMEOUT), + errmsg("recv data timeout."))); + } else if (ret < 0) { + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("recv data error."))); + } + } + send(c_state->fd, out_buffer->data, out_buffer->end, MSG_DONTWAIT); + + rc = memset_s(out_buffer->data, GMS_TCP_MAX_OUT_BUFFER_SIZE, 0, out_buffer->max_buffer_size); + securec_check_c(rc, "\0", "\0"); + out_buffer->start = 0; + out_buffer->end = 0; + out_buffer->free_space = out_buffer->max_buffer_size; + + PG_RETURN_VOID(); +} + +Datum +gms_tcp_close_all_connections(PG_FUNCTION_ARGS) +{ + ListCell* lc = NULL; + + if (!GMS_TCP_FD_BAK_LIST) { + PG_RETURN_VOID(); + } + + foreach (lc, GMS_TCP_FD_BAK_LIST) { + PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); + node->cleanup(node->data); + pfree(node); + } + + list_free_ext(GMS_TCP_FD_BAK_LIST); + + PG_RETURN_VOID(); +} + +Datum +gms_tcp_close_connection(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + + gms_tcp_check_connection_null(c_state); + + gms_tcp_close_connection_by_state(c_state); + + PG_RETURN_VOID(); +} + +Datum +gms_tcp_get_line_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + bool remove_crlf = PG_GETARG_BOOL(1); + bool peek = PG_GETARG_BOOL(2); + bool ch_charset = PG_GETARG_BOOL(3); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + + gms_tcp_check_connection_null(c_state); + + data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); + data_opt.remove_crlf = remove_crlf; + data_opt.peek = peek; + data_opt.report_err = true; + + c_state->c_info.get_data_wait = true; + gms_tcp_get_data(c_state, &data_opt); + c_state->c_info.get_data_wait = false; + + if (ch_charset && c_state->c_info.charset) { + char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false); + if (data) { + pfree(data_opt.data); + PG_RETURN_TEXT_P(cstring_to_text(data)); + } + } + + PG_RETURN_TEXT_P(cstring_to_text(data_opt.data)); +} + +Datum +gms_tcp_get_raw_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + int32 len = PG_GETARG_INT32(1); + bool peek = PG_GETARG_BOOL(2); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + StringInfoData buf; + + gms_tcp_check_connection_null(c_state); + + data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); + data_opt.len = len; + data_opt.peek = peek; + data_opt.report_err = true; + + c_state->c_info.get_data_wait = true; + gms_tcp_get_data(c_state, &data_opt); + c_state->c_info.get_data_wait = false; + + pq_begintypsend(&buf); + pq_sendbytes(&buf, data_opt.data, data_opt.data_len); + PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); +} + +Datum +gms_tcp_get_text_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + int32 len = PG_GETARG_INT32(1); + bool peek = PG_GETARG_BOOL(2); + bool ch_charset = PG_GETARG_BOOL(3); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + + gms_tcp_check_connection_null(c_state); + + data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); + data_opt.len = len; + data_opt.peek = peek; + data_opt.ch_charset = ch_charset; + data_opt.report_err = true; + + c_state->c_info.get_data_wait = true; + gms_tcp_get_data(c_state, &data_opt); + c_state->c_info.get_data_wait = false; + + if (ch_charset && c_state->c_info.charset) { + char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false); + if (data) { + pfree(data_opt.data); + PG_RETURN_TEXT_P(cstring_to_text(data)); + } + } + + PG_RETURN_TEXT_P(cstring_to_text(data_opt.data)); +} + +Datum +gms_tcp_open_connection(PG_FUNCTION_ARGS) +{ + text *remote_host_t = PG_GETARG_TEXT_P(0); + char *remote_host = NULL; + struct sockaddr_in saddr; + int remote_port = PG_GETARG_INT32(1); + text *local_host_t = PG_GETARG_TEXT_P(2); + char *local_host = NULL; + int local_port = PG_GETARG_INT32(3); + int in_buffer_size = PG_GETARG_INT32(4); + int out_buffer_size = PG_GETARG_INT32(5); + text *charset_t = PG_GETARG_TEXT_P(6); + char *charset = NULL; + text *newline_t = PG_GETARG_TEXT_P(7); + char *newline = NULL; + int tx_timeout = PG_GETARG_INT32(8); + GMS_TCP_CONNECTION_HEAD *c_h = NULL; + GMS_TCP_CONNECTION *c = NULL; + GMS_TCP_CONNECTION_STATE *c_state = NULL; + int offset = 0; + + c = (GMS_TCP_CONNECTION *)palloc0(sizeof(GMS_TCP_CONNECTION)); + c_h = &c->c_h; + + if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(remote_host_t, &remote_host, GMS_TCP_MAX_HOST_LEN, &c_h->remote_host_len, &c_h->total_len)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input remote host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN))); + } + if (!remote_host) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input error remote host."))); + } + if (!gms_tcp_change_remote_host(remote_host, &saddr)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input remote host error."))); + } + + if (remote_port) { + c_h->remote_port_len = sizeof(c_h->remote_port_len); + c_h->total_len += c_h->remote_port_len; + } else { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input remote port error."))); + } + + if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(local_host_t, &local_host, GMS_TCP_MAX_HOST_LEN, &c_h->local_host_len, &c_h->total_len)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input local host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN))); + } + + if (local_port) { + c_h->local_port_len = sizeof(c_h->local_port_len); + c_h->total_len += c_h->local_port_len; + } + + if (in_buffer_size > GMS_TCP_MAX_IN_BUFFER_SIZE) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_IN_BUFFER_SIZE))); + } else if (in_buffer_size) { + c_h->in_buffer_size_len = sizeof(c_h->in_buffer_size_len); + c_h->total_len += c_h->in_buffer_size_len; + } + + if (out_buffer_size > GMS_TCP_MAX_OUT_BUFFER_SIZE) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_OUT_BUFFER_SIZE))); + } else if (out_buffer_size) { + c_h->out_buffer_size_len = sizeof(c_h->out_buffer_size_len); + c_h->total_len += c_h->out_buffer_size_len; + } + + if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(charset_t, &charset, GMS_TCP_MAX_CHARSET_LEN, &c_h->charset_len, &c_h->total_len)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input charset too long, max length is %d.", GMS_TCP_MAX_CHARSET_LEN))); + } + + if (charset && !gms_tcp_check_charset(charset)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("error charset: %s.", charset))); + } + + if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(newline_t, &newline, 16, &c_h->newline_len, &c_h->total_len)) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("input newline too long, max length is %d.", GMS_TCP_MAX_NEWLINE_LEN))); + } + + if (tx_timeout > GMS_TCP_MAX_TX_TIMEOUT || tx_timeout < 0) { + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_BAD_ARGUMENT), + errmsg("tx timeout must be limited in %d ~ %d.", 0, GMS_TCP_MAX_TX_TIMEOUT))); + } else if (tx_timeout) { + c_h->tx_timeout_len = sizeof(c_h->tx_timeout_len); + c_h->total_len += c_h->tx_timeout_len; + } + + + Assert(c_h->total_len); + + gms_tcp_comm_memcopy_str(c->data + offset, c_h->remote_host_len, remote_host, &offset); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->remote_port_len, (char *)&remote_port, c_h->remote_port_len, &offset); + gms_tcp_comm_memcopy_str(c->data + offset, c_h->local_host_len, local_host, &offset); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->local_port_len, (char *)&local_port, c_h->local_port_len, &offset); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->in_buffer_size_len, (char *)&in_buffer_size, c_h->in_buffer_size_len, &offset); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->out_buffer_size_len, (char *)&out_buffer_size, c_h->out_buffer_size_len, &offset); + gms_tcp_comm_memcopy_str(c->data + offset, c_h->charset_len, charset, &offset); + gms_tcp_comm_memcopy_str(c->data + offset, c_h->newline_len, newline, &offset); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->tx_timeout_len, (char *)&tx_timeout, c_h->tx_timeout_len, &offset); + + c_state = (GMS_TCP_CONNECTION_STATE *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_STATE)); + gms_tcp_get_connection_info(c, &c_state->c_info); + c_state->c_info.saddr = saddr; + gms_tcp_init_data_buffer(&c_state->in_buffer, in_buffer_size); + gms_tcp_init_data_buffer(&c_state->out_buffer, out_buffer_size); + if (gms_tcp_connect(c_state) != GMS_TCP_OK) { + std::shared_ptr data_cstate = std::shared_ptr(c_state); + gms_tcp_release_connection_state(data_cstate); + pfree(c); + ereport(ERROR, + (errcode(ERRCODE_NETWORK_ERROR), + errmsg("connect to remote(remote host: %s, remote port: %d) fail.", + remote_host, remote_port))); + } + + c_h->fd_len = sizeof(int); + gms_tcp_comm_memcopy_int(c->data + offset, c_h->fd_len, (char *)&c_state->fd, c_h->fd_len, &offset); + c_h->total_len += c_h->fd_len; + + c_state->c_info.fd = c_state->fd; + + gms_tcp_store_connection(c_state); + + PG_RETURN_POINTER(c); +} + +Datum +gms_tcp_write_line(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + text *data_t = PG_GETARG_TEXT_P(1); + char *data = text_to_cstring(data_t); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + char *data_out = NULL; + char *newline = NULL; + int newline_len = 0; + int32 len = 0; + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + errno_t rc = EOK; + + gms_tcp_check_connection_null(c_state); + + data_out = (char *)palloc0(GMS_TCP_MAX_OUT_BUFFER_SIZE); + + Assert(c_state->c_info.newline); + if (strcmp(c_state->c_info.newline, "CRLF") == 0) { + newline_len = 2; + newline = "\r\n"; + } else { + newline_len = 1; + newline = "\n"; + } + + rc = memcpy_s(data_out, GMS_TCP_MAX_OUT_BUFFER_SIZE, data, strlen(data)); + securec_check_c(rc, "\0", "\0"); + data_opt.data = data_out; + data_opt.data_len = strlen(data_out); + rc = memcpy_s(&data_out[data_opt.data_len], GMS_TCP_MAX_OUT_BUFFER_SIZE - data_opt.data_len, newline, newline_len); + securec_check_c(rc, "\0", "\0"); + data_opt.data_len += newline_len; + + data_opt.ch_charset = true; + data_opt.encrypt = true; + + len = gms_tcp_write_data(c_state, &data_opt); + + PG_RETURN_INT32(len); +} + +Datum +gms_tcp_write_raw_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + bytea *wbuf = PG_GETARG_BYTEA_P(1); + int32 write_len = PG_GETARG_INT32(2); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + int32 len = 0; + + gms_tcp_check_connection_null(c_state); + + data_opt.data = VARDATA(wbuf); + data_opt.data_len = VARSIZE(wbuf) - VARHDRSZ; + data_opt.len = write_len; + + len = gms_tcp_write_data(c_state, &data_opt); + + PG_RETURN_INT32(len); +} + +Datum +gms_tcp_write_text_real(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + text *data_t = PG_GETARG_TEXT_P(1); + int32 write_len = PG_GETARG_INT32(2); + char *data = text_to_cstring(data_t); + GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); + int32 len = 0; + GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; + + gms_tcp_check_connection_null(c_state); + + data_opt.data = data; + data_opt.data_len = strlen(data); + data_opt.len = write_len; + data_opt.ch_charset = true; + data_opt.encrypt = false; + + len = gms_tcp_write_data(c_state, &data_opt); + + PG_RETURN_INT32(len); +} + + +Datum +gms_tcp_connection_in(PG_FUNCTION_ARGS) +{ + PG_RETURN_NULL(); +} + +Datum +gms_tcp_connection_out(PG_FUNCTION_ARGS) +{ + GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); + GMS_TCP_CONNECTION_INFO *c_info = NULL; + StringInfoData str; + + if(!c) { + PG_RETURN_NULL(); + } + + c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO)); + gms_tcp_get_connection_info(c, c_info); + + initStringInfo(&str); + + if (c_info->remote_host) { + appendStringInfo(&str, "remote host:"); + appendStringInfo(&str, "%s", c_info->remote_host); + } + if (c_info->remote_port) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "remote port:"); + appendStringInfo(&str, "%d", c_info->remote_port); + } + if (c_info->local_host) { + appendStringInfo(&str, "local host:"); + appendStringInfo(&str, "%s", c_info->local_host); + } + if (c_info->local_port) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "local port:"); + appendStringInfo(&str, "%d", c_info->local_port); + } + if (c_info->in_buffer_size) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "in buffer size:"); + appendStringInfo(&str, "%d", c_info->in_buffer_size); + } + if (c_info->out_buffer_size) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "out buffer size:"); + appendStringInfo(&str, "%d", c_info->out_buffer_size); + } + if (c_info->charset) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "charset:"); + appendStringInfo(&str, "%s", c_info->charset); + } + if (c_info->newline && (strcmp(c_info->newline, "CRLF") != 0)) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "newline:"); + appendStringInfo(&str, "%s", c_info->newline); + } + if (c_info->tx_timeout && (c_info->tx_timeout != GMS_TCP_MAX_TX_TIMEOUT)) { + appendStringInfoChar(&str, ','); + appendStringInfo(&str, "tx timeout:"); + appendStringInfo(&str, "%d", c_info->tx_timeout); + } + + gms_tcp_release_connection_info(c_info); + pfree(c_info); + + PG_RETURN_CSTRING(str.data); +} + +static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset) +{ + errno_t rc = EOK; + + if (src) { + int len = 0; + int src_len = strlen(src); + rc = memcpy_s(dst, dst_len, src, src_len); + securec_check_c(rc, "\0", "\0"); + len += src_len; + dst[src_len] = '\0'; + len++; + + if (offset) { + *offset += len; + } + } +} + +static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset) +{ + errno_t rc = EOK; + + if (src_len) { + rc = memcpy_s(dst, dst_len, src, src_len); + securec_check_c(rc, "\0", "\0"); + if (offset) { + *offset += src_len; + } + } +} + +static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len) +{ + if (data_t) { + char *p = text_to_cstring(data_t); + int data_len = strlen(p) + 1; + if (data_len >= data_len_max) { + return ORAFCE_COMM_STRING_TOO_LONG; + } + if (!(strlen(p) == 1 && p[0] == '0')) { + *get_data_len = data_len; + *total_len += data_len; + *data = p; + } + } + + return 0; +} + +static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2) +{ + if ((str_1 && str_2) && (strlen(str_1) == strlen(str_2)) && (memcmp(str_1, str_2, strlen(str_1)) == 0)) { + return true; + } else if (!str_1 && !str_2) { + return true; + } else { + return false; + } +} diff --git a/contrib/gms_tcp/gms_tcp.h b/contrib/gms_tcp/gms_tcp.h new file mode 100644 index 000000000..b2729b716 --- /dev/null +++ b/contrib/gms_tcp/gms_tcp.h @@ -0,0 +1,135 @@ +#ifndef __GMS_TCP__ +#define __GMS_TCP__ + +#include +#include +#include +#include +#include + +#include "postgres.h" +#include "knl/knl_thread.h" +#include "utils/builtins.h" +#include "utils/portal.h" +#include "utils/palloc.h" +#include "libpq/pqformat.h" +#include "utils/elog.h" + +#define GMS_TCP_MAX_HOST_LEN 255 +#define GMS_TCP_MAX_CHARSET_LEN 32 +#define GMS_TCP_MAX_NEWLINE_LEN 16 +#define GMS_TCP_MAX_IN_BUFFER_SIZE 32767 +#define GMS_TCP_MAX_OUT_BUFFER_SIZE 32767 +#define GMS_TCP_MAX_CONNECTION 32 +#define GMS_TCP_MAX_PASSWORD_LEN 32 +#define GMS_TCP_MAX_PATH_LEN 255 +#define GMS_TCP_MAX_TX_TIMEOUT 2147483647 + +#define GMS_TCP_FD_BAK_LIST *u_sess->exec_cxt.portal_data_list + +typedef enum { + GMS_TCP_CONNECT_OK = 0, + GMS_TCP_CONNECT_FAIL +} GMS_TCP_CONNECT_STATE; + +typedef enum { + GMS_TCP_OK = 0, + GMS_TCP_STRING_TOO_LONG +} GMS_TCP_CONNECT_ERROR; + +typedef struct { + int32 total_len; + int32 remote_host_len; + int32 remote_port_len; + int32 local_host_len; + int32 local_port_len; + int32 in_buffer_size_len; + int32 out_buffer_size_len; + int32 charset_len; + int32 newline_len; + int32 tx_timeout_len; + int32 fd_len; +} GMS_TCP_CONNECTION_HEAD; + +#define GMS_TCP_MAX_TYPE_LEN 512 +#define GMS_TCP_MAX_TYPE_DATA_LEN (GMS_TCP_MAX_TYPE_LEN - sizeof(GMS_TCP_CONNECTION_HEAD)) +typedef struct { + GMS_TCP_CONNECTION_HEAD c_h; + char data[GMS_TCP_MAX_TYPE_DATA_LEN]; +} GMS_TCP_CONNECTION; + +typedef struct { + char *remote_host; + struct sockaddr_in saddr; + int32 remote_port; + char *local_host; + int32 local_port; + int32 in_buffer_size; + int32 out_buffer_size; + char *charset; + char *newline; + bool get_data_wait; + int32 tx_timeout; + bool available_wait; + int32 available_timeout; + int fd; +} GMS_TCP_CONNECTION_INFO; + +typedef struct { + int32 max_buffer_size; + int32 free_space; + int32 start; + int32 end; + char *data; +} GMS_TCP_CONNECTION_BUFFER; + +typedef struct { + int fd; + bool secure; + GMS_TCP_CONNECT_STATE state; + GMS_TCP_CONNECTION_BUFFER in_buffer; + GMS_TCP_CONNECTION_BUFFER out_buffer; + GMS_TCP_CONNECTION_INFO c_info; +} GMS_TCP_CONNECTION_STATE; + +typedef struct { + bool remove_crlf; + bool peek; + bool report_err; + bool ch_charset; + bool encrypt; + int32 len; + char *data; + int32 data_len; +} GMS_TCP_CONNECTION_DATA_OPT; + +typedef enum { + ORAFCE_COMM_OK = 0, + ORAFCE_COMM_STRING_TOO_LONG +} ORAFCE_COMM_ERROR; + +extern void gms_tcp_release_connection_info(GMS_TCP_CONNECTION_INFO *c_info); +extern void gms_tcp_get_connection_info(GMS_TCP_CONNECTION *c, GMS_TCP_CONNECTION_INFO *c_info); +extern GMS_TCP_CONNECTION_STATE *gms_tcp_get_connection_state_by_fd(int fd); +extern void gms_tcp_get_connection_head_by_info(GMS_TCP_CONNECTION_INFO *c_info, GMS_TCP_CONNECTION_HEAD *c_h); + +/* + * External declarations + */ +extern "C" Datum gms_tcp_crlf(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_available_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_close_all_connections(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_close_connection(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_flush(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_get_line_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_get_raw_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_get_text_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_open_connection(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_secure_connection(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_no_secure_connection(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_write_line(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_write_raw_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_write_text_real(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_connection_in(PG_FUNCTION_ARGS); +extern "C" Datum gms_tcp_connection_out(PG_FUNCTION_ARGS); +#endif \ No newline at end of file diff --git a/contrib/gms_tcp/gms_tcp_test/gms_tcp_server.class b/contrib/gms_tcp/gms_tcp_test/gms_tcp_server.class new file mode 100644 index 0000000000000000000000000000000000000000..93da9fc5e34bf9f99c6f5088ed5ac7ad8a6f2c48 GIT binary patch literal 4436 zcmX^0Z`VEs1_l#`eOwF<3{mV1(Hsmh91O7>3~?L`@f-{ZTnyd}iChd!3`rae$s7zR zAZ98DLmG%q2N4-u3_c8*oD5kE+3XBCTns`CQ5+1pAUY33&d>lN8aWu6I2f9_80;BZ zI2c+%bQ=dlJBaS!VCV$VT^tPEAi9Txp%+B=aWM3Q=m{JQ6G8MO4u;7fdI|@_R1iIl zgJC)sg9O72c7~bk46_&+*laTMGE3|j8JIOR!x$M@obyvs85#I}GV@aXN^_G^i$W5U za#9%?1k-bi<4ckY;xqH&lSW?Q=l5*?gxuObPBmK};hW;@_OaymbBGlA_GKbZgBpc82+k3|ztarA5i9ZkeC}6oy$=oLW?tTBHXG zIUWW_1}8=a0T5ds$pA(Mj>N>o#H6I8Bt`~~##a(-T3YH|rgW+%fg9){fvdl(sbic1oUN)$>` zi%YB&KrsUK9S?&qgBz3&at;rJABfKm@_kWaIS+$Bh|3B04i7^BLI9#65G23>Rsq%# z0^)OK<|#n39uI>nNPr8RZoxsq!w`fd0x={QB*Icynpwib;00ob|h<4KSWMp91h<5Vy1eG{kMX3cjiOH#sIXR3B7SQqvDW^dT zD`XK(%>2yAz?NEAnwV3}$RLVr2E=35no*G0cB(8%EoNk3(a;2WlocF1P)CKK1W0gY zaY=_&w zK!AsV5md-BFfuqZFfhn6FfuSQFfg!cZD(NI$iTqB$l$`jz`zC;WM|+6tB#VsEHU|D}3<8@N1a~tCMQ&#h z782RUAPSKX-NqocjX_+7C4r;BcKQERR;s57!Nc(5~g{$>buVrP(GVc-On6wnZ;V_;!mWnf@X6L8qW;VUHR2lktg z)HVj`?F=$jEZUMR5N|AJVD;Mwav>{&0)sN#g-Q$@3=9nF46F=V3>*yF4B`wr3@vFsL(FF=#MYGiWl{ zBHW^ma0?UMEq)Ac7;cfH&@E03%nYs!+zf6EDh!?s>J07-8Vnu`nhah9+~N;+%R~ki zaJV=MaP8rUWM&D8WMK`7WMvD9WMdDBWakKpD#Bi4@=v;0%cr z;1u8r0Xts+9$aP&)(m#=;F4lsV_;zLXE0+3Vz6ciWw2uiX0T@nVQ^pwLpaY45mbn1 z3xGSXjsX^J@?g8LBzhvFEt-LuA&!BYA)Y~nA&EhqA%Q`IA(26oAsOKotkD*T#VraH zx+Q~wnIW5jn<0lmg(06oogtS&gCUPWlc9isTY}(jsbydVXDwB5*3#11#=v0*%3V6l zvY>*4WfOxSD07L!qlT41o$so>9#URg6%b>(i&7jOs!=S=Y zhj5A#wkQgwu2Y&BSQ*+FxEb0R#2Gpm>V#CcF^GfWPtA%IM5$Y`f#TVUU6NIj9aKwjfZ|$`omrFv z6x*DVoUm$w6RAwo0M!Vb@Ip&|1!LP?A$f8(BP9a@n zF}>vsGbK5N^pV93mNT?UatawDiy19vD3s(BGDa3NSV3wW9z_y%$4-|m5+Ze=` z|J%Qufx}088-v|O1|9}uhBihg1__2vhFM^GF2g!7y^-M%m_EvI4NTu;cm<~4GW-M4 zjEsyzU|N(>4NPk?nu2KyMtcy=z`z*E7z58q_ZZ9>7#QX;urSPL;AL3AAj+_aL7ZVR zgDk@m233Y-3_1+U8H^cLGgvaLVQ^wt%Mie@jv<<1JwqwOMuvKZO$==en;AM8wlGX) z*vc@AVH?9-hV2Z?8FnzNW7x^Ckzp6ZZid|qhZy!S9A((caGqfw!!?He3^y4LFg#{B z$nc8c5W`!B!wlaUjxhXVILgS#aEy_g;W(oZ!wE)FhLeo)45t{?7)~>4GMr&FW;n}e z%5aX+g5d(AJ;OytCx*+6E(})~-5G8$dNbT)^kcZg7{+jyF_Pf{V+_MR##n~?jEM{n z!D%~zLFg}oC?h+A;(rEN26lD^PY|1togo}j<1#S(V+dtu@L*(TU=oG`zW)rI*pz~5 cW)`;p3|ydk(UXAz(rRF2h+tq~U}A^_0KrhO00000 literal 0 HcmV?d00001 diff --git a/contrib/gms_tcp/gms_tcp_test/gms_tcp_server.java b/contrib/gms_tcp/gms_tcp_test/gms_tcp_server.java new file mode 100644 index 000000000..1b2c046b4 --- /dev/null +++ b/contrib/gms_tcp/gms_tcp_test/gms_tcp_server.java @@ -0,0 +1,226 @@ +import java.io.InputStream; +import java.io.OutputStream; +import java.net.*; +import java.nio.charset.Charset; + +public +class gms_tcp_server { + +public + static void gms_tcp_in_buffer(InputStream inputStream, OutputStream outputStream) + { + int i = 0; + byte[] buffer = new byte[256]; + String[] data = {"aaaabbbb", + "ccccdddd", + "eeeeffff", + "gggghhhh", + "01234567"}; + while (true) { + try { + int n = inputStream.read(buffer); + String inputMsg = new String(buffer, 0, n); + inputMsg = inputMsg.replaceAll("[\n\r]", ""); + + if (inputMsg.equals("ok")) { + outputStream.write(data[i].getBytes()); + System.out.println("Send msg: " + data[i]); + i = (i + 1) % 5; + } + } catch (Exception e) { + break; + } + } + } + +public + static void gms_tcp_char_set(InputStream inputStream, OutputStream outputStream) + { + byte[] inputMsg = new byte[64]; + + try { + String outString = "abcdefg"; + String outMsg = new String(outString.getBytes("GBK")); + outputStream.write(outMsg.getBytes()); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_get_line(InputStream inputStream, OutputStream outputStream) + { + String outMsg = "get line, abcdefg1234567890\n"; + + try { + outputStream.write(outMsg.getBytes()); + System.out.println("Send msg: " + outMsg); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_get_text(InputStream inputStream, OutputStream outputStream) + { + String outMsg = "get text, abcdefg1234567890\n"; + + try { + outputStream.write(outMsg.getBytes()); + System.out.println("Send msg: " + outMsg); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_get_raw(InputStream inputStream, OutputStream outputStream) + { + byte[] outMsg = {1,2,3,4,5,6,7,8,9,10}; + + try { + outputStream.write(outMsg); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_read_line(InputStream inputStream, OutputStream outputStream) + { + String outMsg = "read line, abcdefg1234567890\n"; + + try { + outputStream.write(outMsg.getBytes()); + System.out.println("Send msg: " + outMsg); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_read_text(InputStream inputStream, OutputStream outputStream) + { + String outMsg = "read text, abcdefg1234567890\n"; + + try { + outputStream.write(outMsg.getBytes()); + System.out.println("Send msg: " + outMsg); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_write_line(InputStream inputStream, OutputStream outputStream) + { + byte[] inputMsg = new byte[64]; + + try { + int n = inputStream.read(inputMsg); + String inputMsgString = new String(inputMsg, 0, n); + System.out.println(inputMsgString); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_write_text(InputStream inputStream, OutputStream outputStream) + { + byte[] inputMsg = new byte[64]; + + try { + int n = inputStream.read(inputMsg); + String inputMsgString = new String(inputMsg, 0, n); + System.out.println(inputMsgString); + } catch (Exception e) { + return; + } + } + +public + static void gms_tcp_read_raw(InputStream inputStream, OutputStream outputStream) + { + byte[] outMsg = {1,2,3,4,5,6,7,8,9,10}; + + try { + outputStream.write(outMsg); + } catch (Exception e) { + return; + } + } + +public + static void main(String[] args) + { + try { + int port = 12358; + ServerSocket serverSocket = new ServerSocket(port); + System.out.println("Server is ok."); + while (true) { + boolean quit = false; + Socket clientSocket = serverSocket.accept(); + System.out.println("Client connect ok."); + + InputStream inputStream = clientSocket.getInputStream(); + OutputStream outputStream = clientSocket.getOutputStream(); + + while (true) { + try { + byte[] testType = new byte[64]; + int n = inputStream.read(testType); + String testTypeString = new String(testType, 0, n); + testTypeString = testTypeString.replaceAll("[\n\r]", ""); + System.out.println("start test: " + testTypeString); + + if (testTypeString.equals("get line")) { + gms_tcp_get_line(inputStream, outputStream); + break; + } else if (testTypeString.equals("get text")) { + gms_tcp_get_text(inputStream, outputStream); + break; + } else if (testTypeString.equals("get raw")) { + gms_tcp_get_raw(inputStream, outputStream); + break; + } else if (testTypeString.equals("read line")) { + gms_tcp_read_line(inputStream, outputStream); + break; + } else if (testTypeString.equals("read text")) { + gms_tcp_read_text(inputStream, outputStream); + break; + } else if (testTypeString.equals("read raw")) { + gms_tcp_read_raw(inputStream, outputStream); + break; + } else if (testTypeString.equals("in buffer")) { + gms_tcp_in_buffer(inputStream, outputStream); + break; + } else if (testTypeString.equals("write line")) { + gms_tcp_write_line(inputStream, outputStream); + break; + } else if (testTypeString.equals("write text")) { + gms_tcp_write_text(inputStream, outputStream); + break; + } else if (testTypeString.equals("quit")) { + quit = true; + break; + } else { + gms_tcp_char_set(inputStream, outputStream); + break; + } + } catch (Exception e) { + break; + } + } + + if (quit) { + break; + } + + System.out.println("one client over\n===================================================================\n"); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/contrib/gms_tcp/input/gms_tcp_server.source b/contrib/gms_tcp/input/gms_tcp_server.source new file mode 100644 index 000000000..e615462fe --- /dev/null +++ b/contrib/gms_tcp/input/gms_tcp_server.source @@ -0,0 +1,2 @@ +\! chmod -R 700 @abs_bindir@/../jre +\! @abs_bindir@/../jre/bin/java -cp $CLASSPATH:@abs_builddir@/../../../src/test/regress/jdbc_test/gsjdbc400.jar:@abs_builddir@/gms_tcp_test/. gms_tcp_server diff --git a/contrib/gms_tcp/output/gms_tcp_server.source b/contrib/gms_tcp/output/gms_tcp_server.source new file mode 100644 index 000000000..e71cf2784 --- /dev/null +++ b/contrib/gms_tcp/output/gms_tcp_server.source @@ -0,0 +1,75 @@ +\! chmod -R 700 @abs_bindir@/../jre +\! @abs_bindir@/../jre/bin/java -cp $CLASSPATH:@abs_builddir@/../../../src/test/regress/jdbc_test/gsjdbc400.jar:@abs_builddir@/gms_tcp_test/. gms_tcp_server +Server is ok. +Client connect ok. +start test: in buffer +Send msg: aaaabbbb +Send msg: ccccdddd +Send msg: eeeeffff +Send msg: gggghhhh +Send msg: 01234567 +Send msg: aaaabbbb +Send msg: ccccdddd +Send msg: eeeeffff +Send msg: gggghhhh +one client over +=================================================================== + +Client connect ok. +start test: get line +Send msg: get line, abcdefg1234567890 + +one client over +=================================================================== + +Client connect ok. +start test: get text +Send msg: get text, abcdefg1234567890 + +one client over +=================================================================== + +Client connect ok. +start test: get raw +one client over +=================================================================== + +Client connect ok. +start test: read line +Send msg: read line, abcdefg1234567890 + +one client over +=================================================================== + +Client connect ok. +start test: read text +Send msg: read text, abcdefg1234567890 + +one client over +=================================================================== + +Client connect ok. +start test: read raw +one client over +=================================================================== + +Client connect ok. +start test: write line +0123456789 + +one client over +=================================================================== + +Client connect ok. +start test: write text +012345 +one client over +=================================================================== + +Client connect ok. +start test: char set +one client over +=================================================================== + +Client connect ok. +start test: quit diff --git a/contrib/gms_tcp/parallel_schedule b/contrib/gms_tcp/parallel_schedule new file mode 100644 index 000000000..4ab975f8b --- /dev/null +++ b/contrib/gms_tcp/parallel_schedule @@ -0,0 +1 @@ +test: gms_tcp_server gms_tcp_client diff --git a/contrib/gms_tcp/sql/gms_tcp.sql b/contrib/gms_tcp/sql/gms_tcp.sql new file mode 100644 index 000000000..042252b56 --- /dev/null +++ b/contrib/gms_tcp/sql/gms_tcp.sql @@ -0,0 +1 @@ +\dx diff --git a/contrib/gms_tcp/sql/gms_tcp_client.sql b/contrib/gms_tcp/sql/gms_tcp_client.sql new file mode 100644 index 000000000..d61024dd7 --- /dev/null +++ b/contrib/gms_tcp/sql/gms_tcp_client.sql @@ -0,0 +1,615 @@ +-- +--test in_buffer +-- +create extension gms_tcp; +create or replace function gms_tcp_test_in_buffer() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'in buffer'); + pg_sleep(1); + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 5; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 12; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 13; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 12; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 5; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 17; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 9; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + len = 11; + data = gms_tcp.get_text(c, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.write_line(c, 'ok'); + pg_sleep(1); + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +-- +--test read data +-- +--get line +create or replace function gms_tcp_test_get_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get line'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--get text +create or replace function gms_tcp_test_get_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get text'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_text(c, 17, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--get raw +create or replace function gms_tcp_test_get_raw() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data raw; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'get raw'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_raw(c, 4, true); + raise info 'available: %, rcv: %.', num, data; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_raw(c, 8); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--read line +create or replace function gms_tcp_test_read_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read line'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--read text +create or replace function gms_tcp_test_read_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; + out_len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read text'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 18; + gms_tcp.read_text(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +create or replace function gms_tcp_test_read_raw() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data raw; + len integer; + out_len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'read raw'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 3; + gms_tcp.read_raw(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 4; + gms_tcp.read_raw(c, data, len, out_len, true); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + num = gms_tcp.available(c,1); + if num > 0 then + out_len = 8; + gms_tcp.read_raw(c, data, len, out_len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--write line +create or replace function gms_tcp_test_write_line() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + newline=>'LF', + tx_timeout=>10); + num = gms_tcp.write_line(c, 'write line'); + pg_sleep(1); + num = gms_tcp.write_line(c, '0123456789'); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +--write text +create or replace function gms_tcp_test_write_text() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; + len integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + --out_buffer_size=>20480, + tx_timeout=>10); + num = gms_tcp.write_text(c, 'write text', 10); + pg_sleep(1); + num = gms_tcp.write_text(c, '0123456789', 6); + + num = gms_tcp.available(c,1); + if num > 0 then + gms_tcp.read_line(c, data, len); + raise info 'available: %, rcv: %(%).', num, data, len; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +create or replace function gms_tcp_test_error_in_buffer_size() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>40480, + out_buffer_size=>20480, + newline=>'lf', + tx_timeout=>10); + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +create or replace function gms_tcp_test_error_out_buffer_size() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>40480, + newline=>'lf', + tx_timeout=>10); + gms_tcp.close_all_connections(); + +exception + when gms_tcp_network_error then + raise info 'caught gms_tcp_network_error'; + gms_tcp.close_all_connections(); + + when gms_tcp_bad_argument then + raise info 'caught gms_tcp_bad_argument'; + gms_tcp.close_all_connections(); + + when gms_tcp_buffer_too_small then + raise info 'caught gms_tcp_buffer_too_small'; + gms_tcp.close_all_connections(); + + when gms_tcp_end_of_input then + raise info 'caught gms_tcp_end_of_input'; + gms_tcp.close_all_connections(); + + when gms_tcp_transfer_timeout then + raise info 'caught gms_tcp_transfer_timeout'; + gms_tcp.close_all_connections(); + + when gms_tcp_partial_multibyte_char then + raise info 'caught gms_tcp_partial_multibyte_char'; + gms_tcp.close_all_connections(); + + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +-- +--char_set +-- +create or replace function gms_tcp_test_char_set() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; + data varchar2; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + in_buffer_size=>20480, + out_buffer_size=>20480, + cset=>'gbk', + tx_timeout=>10); + num = gms_tcp.write_line(c, 'char set'); + gms_tcp.flush(c); + + num = gms_tcp.available(c,1); + if num > 0 then + data = gms_tcp.get_line(c); + raise info 'available: %, rcv: %.', num, data; + end if; + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +create or replace function gms_tcp_test_quit() + returns void + language plpgsql +as $function$ +declare + c gms_tcp.connection; + num integer; +begin + c = gms_tcp.open_connection(remote_host=>'127.0.0.1', + remote_port=>12358, + tx_timeout=>10); + num = gms_tcp.write_line(c, 'quit'); + + gms_tcp.close_all_connections(); + +exception + when others then + raise info 'caught others'; + gms_tcp.close_all_connections(); +end; +$function$; + +select pg_sleep(5); + +select gms_tcp_test_in_buffer(); +select gms_tcp_test_get_line(); +select gms_tcp_test_get_text(); +select gms_tcp_test_get_raw(); +select gms_tcp_test_read_line(); +select gms_tcp_test_read_text(); +select gms_tcp_test_read_raw(); +select gms_tcp_test_write_line(); +select gms_tcp_test_write_text(); +select gms_tcp_test_error_in_buffer_size(); +select gms_tcp_test_error_out_buffer_size(); +select gms_tcp_test_char_set(); +select gms_tcp_test_quit(); + +drop function gms_tcp_test_in_buffer(); +drop function gms_tcp_test_get_line(); +drop function gms_tcp_test_get_text(); +drop function gms_tcp_test_get_raw(); +drop function gms_tcp_test_read_line(); +drop function gms_tcp_test_read_text(); +drop function gms_tcp_test_read_raw(); +drop function gms_tcp_test_write_line(); +drop function gms_tcp_test_write_text(); +drop function gms_tcp_test_error_in_buffer_size(); +drop function gms_tcp_test_error_out_buffer_size(); +drop function gms_tcp_test_char_set(); +drop function gms_tcp_test_quit(); diff --git a/src/common/backend/utils/errcodes.txt b/src/common/backend/utils/errcodes.txt index bd9235250..e2e89b859 100644 --- a/src/common/backend/utils/errcodes.txt +++ b/src/common/backend/utils/errcodes.txt @@ -639,4 +639,12 @@ Section: Class SE - Security Error SE001 E ERRCODE_INVALID_AUDIT_LOG invalid_audit_log Section: Class SR - Uncorrected Error & warning -SR001 E ERRCODE_SR_RECOVERY_CONFLICT recovery_conflict \ No newline at end of file +SR001 E ERRCODE_SR_RECOVERY_CONFLICT recovery_conflict + +Section: Class UT - gms_tcp +U0001 E ERRCODE_END_OF_INPUT gms_tcp_end_of_input +U0002 E ERRCODE_NETWORK_ERROR gms_tcp_network_error +U0003 E ERRCODE_BAD_ARGUMENT gms_tcp_bad_argument +U0004 E ERRCODE_TRANSFER_TIMEOUT gms_tcp_transfer_timeout +U0005 E ERRCODE_PARTIAL_MULTIBYTE_CHAR gms_tcp_partial_multibyte_char +U0006 E ERRCODE_BUFFER_TOO_SMALL gms_tcp_buffer_too_small diff --git a/src/common/backend/utils/mmgr/portalmem.cpp b/src/common/backend/utils/mmgr/portalmem.cpp index 223f6dcfa..f50a73a0b 100755 --- a/src/common/backend/utils/mmgr/portalmem.cpp +++ b/src/common/backend/utils/mmgr/portalmem.cpp @@ -31,6 +31,7 @@ #include "utils/plpgsql.h" #include "utils/timestamp.h" #include "utils/resowner.h" +#include "utils/palloc.h" #include "nodes/execnodes.h" #include "opfusion/opfusion.h" @@ -274,6 +275,7 @@ Portal CreatePortal(const char* name, bool allowDup, bool dupSilent, bool is_fro portal->isAutoOutParam = false; portal->isPkgCur = false; #endif + portal->specialDataList = NULL; /* put portal in table (sets portal->name) */ PortalHashTableInsert(portal, name); @@ -530,6 +532,25 @@ void MarkPortalFailed(Portal portal) } } +void PortalReleaseSpecialData(Portal portal) +{ + ListCell *lc = NULL; + + if (!list_length(portal->specialDataList)) { + return; + } + + foreach (lc, portal->specialDataList) { + PortalSpecialData *specialData = (PortalSpecialData *)lfirst(lc); + if (specialData->cleanup) { + specialData->cleanup(specialData->data); + } + pfree(specialData); + } + + list_free_ext(portal->specialDataList); +} + /* * PortalDrop * Destroy the portal. @@ -643,6 +664,8 @@ void PortalDrop(Portal portal, bool isTopCommit) #endif MemoryContextDelete(portal->holdContext); + PortalReleaseSpecialData(portal); + /* release subsidiary storage */ MemoryContextDelete(PortalGetHeapMemory(portal)); diff --git a/src/gausskernel/process/tcop/pquery.cpp b/src/gausskernel/process/tcop/pquery.cpp index ddbe19288..341ce3b6c 100644 --- a/src/gausskernel/process/tcop/pquery.cpp +++ b/src/gausskernel/process/tcop/pquery.cpp @@ -1017,6 +1017,7 @@ bool PortalRun( MemoryContext savePortalContext; MemoryContext saveMemoryContext; errno_t errorno = EOK; + List **savePortalDataList; AssertArg(PortalIsValid(portal)); AssertArg(PointerIsValid(portal->commandTag)); @@ -1094,6 +1095,7 @@ bool PortalRun( saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner; savePortalContext = t_thrd.mem_cxt.portal_mem_cxt; saveMemoryContext = CurrentMemoryContext; + savePortalDataList = u_sess->exec_cxt.portal_data_list; u_sess->attr.attr_sql.create_index_concurrently = false; @@ -1125,6 +1127,7 @@ bool PortalRun( needResetErrMsg = stp_disable_xact_and_set_err_msg(&savedisAllowCommitRollback, STP_XACT_TOO_MANY_PORTAL); } t_thrd.mem_cxt.portal_mem_cxt = PortalGetHeapMemory(portal); + u_sess->exec_cxt.portal_data_list = &portal->specialDataList; MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); @@ -1221,6 +1224,7 @@ bool PortalRun( t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner; } t_thrd.mem_cxt.portal_mem_cxt = savePortalContext; + u_sess->exec_cxt.portal_data_list = savePortalDataList; ereport(DEBUG3, (errmodule(MOD_NEST_COMPILE), errcode(ERRCODE_LOG), errmsg("%s clear curr_compile_context because of error.", __func__))); @@ -1269,6 +1273,7 @@ bool PortalRun( t_thrd.utils_cxt.CurrentResourceOwner = saveResourceOwner; } t_thrd.mem_cxt.portal_mem_cxt = savePortalContext; + u_sess->exec_cxt.portal_data_list = savePortalDataList; if (portal->strategy != PORTAL_MULTI_QUERY) { PGSTAT_END_TIME_RECORD(EXECUTION_TIME); diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 663efea7e..406e21987 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -193,6 +193,7 @@ typedef struct knl_u_executor_context { void *EventTriggerState; bool isFlashBack; + List **portal_data_list; } knl_u_executor_context; typedef struct knl_u_sig_context { diff --git a/src/include/utils/palloc.h b/src/include/utils/palloc.h index bcf1919bc..283dabf6c 100644 --- a/src/include/utils/palloc.h +++ b/src/include/utils/palloc.h @@ -379,6 +379,11 @@ public: } }; +typedef struct { + std::shared_ptr data; + void (*cleanup)(std::shared_ptr data); /* cleanup hook */ +} PortalSpecialData; + /* *It is used for delete object whose destructor is null and free memory in Destroy() *_objptr can't include type change, for example (A*)b, that will lead to compile error in (_objptr) = NULL diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index 4aa18f6de..b6f532332 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -263,6 +263,11 @@ typedef struct PortalData { bool isPkgCur; /* cursor variable is a package variable? */ #endif int nextval_default_expr_type; /* nextval does not support lightproxy and sqlbypass */ + List *specialDataList; + /* + * A specific data link list hung under the portal. + * The special data mounted in the portal can be used during cleaning,such as utl_tcp. + */ } PortalData; /*