/* * This code implements the functons of gms_tcp */ #include "gms_tcp.h" PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(gms_tcp_crlf); PG_FUNCTION_INFO_V1(gms_tcp_available_real); PG_FUNCTION_INFO_V1(gms_tcp_close_all_connections); PG_FUNCTION_INFO_V1(gms_tcp_close_connection); PG_FUNCTION_INFO_V1(gms_tcp_flush); PG_FUNCTION_INFO_V1(gms_tcp_get_line_real); PG_FUNCTION_INFO_V1(gms_tcp_get_raw_real); PG_FUNCTION_INFO_V1(gms_tcp_get_text_real); PG_FUNCTION_INFO_V1(gms_tcp_open_connection); PG_FUNCTION_INFO_V1(gms_tcp_write_line); PG_FUNCTION_INFO_V1(gms_tcp_write_raw_real); PG_FUNCTION_INFO_V1(gms_tcp_write_text_real); PG_FUNCTION_INFO_V1(gms_tcp_connection_in); PG_FUNCTION_INFO_V1(gms_tcp_connection_out); static void *gms_tcp_alloc(int len); static inline List *gms_tcp_lappend(List* list, void* datum); static inline void gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size); static bool gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr); static bool gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr); static int gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state); static void gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state); static GMS_TCP_CONNECTION_STATE *gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c); static void gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len); static int32 gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len); static void gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer); static void gms_tcp_release_connection_state(std::shared_ptr data); static void gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len); static int32 gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); static void gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state); static int32 gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state); static inline int gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout); static inline int gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout); static void gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err); static void gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); static bool gms_tcp_check_charset(char *charset); static inline int32 gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer); static char *gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write); static char *gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data); static char *gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data); static void gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); static int32 gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt); static void gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state); static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset); static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset); static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len); static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2); static void * gms_tcp_alloc(int len) { void *result = NULL; MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); result = palloc0(len); MemoryContextSwitchTo(old_context); return result; } static inline List * gms_tcp_lappend(List* list, void* datum) { List *result = NULL; MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); result = lappend(list, datum); MemoryContextSwitchTo(old_context); return result; } static inline List * gms_tcp_lappend_int(List* list, int datum) { List *result = NULL; MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt); result = lappend_int(list, datum); MemoryContextSwitchTo(old_context); return result; } void gms_tcp_get_connection_info(GMS_TCP_CONNECTION *c, GMS_TCP_CONNECTION_INFO *c_info) { GMS_TCP_CONNECTION_HEAD *c_h = &c->c_h; int offset = 0; if (c_h->remote_host_len) { c_info->remote_host = (char *)gms_tcp_alloc(c_h->remote_host_len); gms_tcp_comm_memcopy_str(c_info->remote_host, c_h->remote_host_len, c->data + offset, &offset); } if (c_h->remote_port_len) { gms_tcp_comm_memcopy_int((char *)&c_info->remote_port, c_h->remote_port_len, c->data + offset, c_h->remote_port_len, &offset); } if (c_h->local_host_len) { c_info->local_host = (char *)gms_tcp_alloc(c_h->local_host_len); gms_tcp_comm_memcopy_str(c_info->local_host, c_h->local_host_len, c->data + offset, &offset); } if (c_h->local_port_len) { gms_tcp_comm_memcopy_int((char *)&c_info->local_port, c_h->local_port_len, c->data + offset, c_h->local_port_len, &offset); } if (c_h->in_buffer_size_len) { gms_tcp_comm_memcopy_int((char *)&c_info->in_buffer_size, c_h->in_buffer_size_len, c->data + offset, c_h->in_buffer_size_len, &offset); } if (c_h->out_buffer_size_len) { gms_tcp_comm_memcopy_int((char *)&c_info->out_buffer_size, c_h->out_buffer_size_len, c->data + offset, c_h->out_buffer_size_len, &offset); } if (c_h->charset_len) { c_info->charset = (char *)gms_tcp_alloc(c_h->charset_len); gms_tcp_comm_memcopy_str(c_info->charset, c_h->charset_len, c->data + offset, &offset); } if (c_h->newline_len) { c_info->newline = (char *)gms_tcp_alloc(c_h->newline_len); gms_tcp_comm_memcopy_str(c_info->newline, c_h->newline_len, c->data + offset, &offset); } if (c_h->tx_timeout_len) { gms_tcp_comm_memcopy_int((char *)&c_info->tx_timeout, c_h->tx_timeout_len, c->data + offset, c_h->tx_timeout_len, &offset); } if (c_h->fd_len) { gms_tcp_comm_memcopy_int((char *)&c_info->fd, c_h->fd_len, c->data + offset, c_h->fd_len, &offset); } return; } void gms_tcp_get_connection_head_by_info(GMS_TCP_CONNECTION_INFO *c_info, GMS_TCP_CONNECTION_HEAD *c_h) { if (c_info->remote_host) { c_h->remote_host_len = strlen(c_info->remote_host) + 1; c_h->total_len += c_h->remote_host_len; } if (c_info->remote_port) { c_h->remote_port_len = sizeof(c_info->remote_port); c_h->total_len += c_h->remote_port_len; } if (c_info->local_host) { c_h->local_host_len = strlen(c_info->local_host) + 1; c_h->total_len += c_h->local_host_len; } if (c_info->local_port) { c_h->local_port_len = sizeof(c_info->local_port); c_h->total_len += c_h->local_port_len; } if (c_info->in_buffer_size) { c_h->in_buffer_size_len = sizeof(c_info->in_buffer_size); c_h->total_len += c_h->in_buffer_size_len; } if (c_info->out_buffer_size) { c_h->out_buffer_size_len = sizeof(c_info->out_buffer_size); c_h->total_len += c_h->out_buffer_size_len; } if (c_info->charset) { c_h->charset_len = strlen(c_info->charset) + 1; c_h->total_len += c_h->charset_len; } if (c_info->newline) { c_h->newline_len = strlen(c_info->newline) + 1; c_h->total_len += c_h->newline_len; } if (c_info->tx_timeout) { c_h->tx_timeout_len = sizeof(c_info->tx_timeout); c_h->total_len += c_h->tx_timeout_len; } if (c_info->fd) { c_h->fd_len = sizeof(c_info->fd); c_h->total_len += c_h->fd_len; } return; } static inline void gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size) { errno_t rc = EOK; rc = memset_s(c_buffer, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, sizeof(GMS_TCP_CONNECTION_BUFFER)); securec_check_c(rc, "\0", "\0"); if (buffer_size) { c_buffer->max_buffer_size = buffer_size; c_buffer->free_space = buffer_size; c_buffer->data = (char *)gms_tcp_alloc(buffer_size); } } static bool gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr) { struct hostent *hptr = gethostbyname(remotehost); if (!hptr) { return false; } if (hptr->h_addrtype == AF_INET) { saddr->sin_addr = *((struct in_addr *)hptr->h_addr_list[0]); return true; } return false; } static bool gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr) { int ret = 0; ret = inet_aton(remotehost, &saddr->sin_addr); if (!ret) { return gms_tcp_get_addr_by_hostname(remotehost, saddr); } return true; } static int gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state) { c_state->fd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in saddr; int res = 0; errno_t rc = EOK; rc = memset_s(&saddr, GMS_TCP_MAX_IN_BUFFER_SIZE, 0,sizeof(saddr)); securec_check_c(rc, "\0", "\0"); saddr.sin_family = AF_INET; saddr.sin_port = htons(c_state->c_info.remote_port); saddr.sin_addr = c_state->c_info.saddr.sin_addr; res = connect(c_state->fd, (struct sockaddr *)&saddr, sizeof(saddr)); if (res) { c_state->fd = 0; return GMS_TCP_CONNECT_FAIL; } c_state->state = GMS_TCP_CONNECT_OK; return GMS_TCP_OK; } static void gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state) { PortalSpecialData *node; node = (PortalSpecialData *)gms_tcp_alloc(sizeof(PortalSpecialData)); node->data = std::shared_ptr(c_state); node->cleanup = gms_tcp_release_connection_state; GMS_TCP_FD_BAK_LIST = gms_tcp_lappend(GMS_TCP_FD_BAK_LIST, node); } static GMS_TCP_CONNECTION_STATE * gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c) { ListCell *lc = NULL; GMS_TCP_CONNECTION_INFO *c_info = NULL; GMS_TCP_CONNECTION_STATE *result = NULL; c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO)); gms_tcp_get_connection_info(c, c_info); foreach (lc, GMS_TCP_FD_BAK_LIST) { PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(node->data).get(); if (c_info->fd == c_state->fd) { result = c_state; break; } } gms_tcp_release_connection_info(c_info); pfree(c_info); return result; } GMS_TCP_CONNECTION_STATE * gms_tcp_get_connection_state_by_fd(int fd) { ListCell *lc = NULL; GMS_TCP_CONNECTION_STATE *result = NULL; foreach (lc, GMS_TCP_FD_BAK_LIST) { PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(node->data).get(); if (fd == c_state->fd) { result = c_state; break; } } return result; } static void gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len) { GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; int32 left_len = 0; error_t errorno = EOK; if (in_buffer->free_space <= data_len) { gms_tcp_close_connection_by_state(c_state); ereport(ERROR, (errcode(ERRCODE_BUFFER_TOO_SMALL), errmsg("input bufer is full."))); } if (in_buffer->end > in_buffer->start) { /* |----start data end----| */ left_len = in_buffer->max_buffer_size - in_buffer->end; if (left_len > data_len) { errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, data_len); securec_check_c(errorno, "\0", "\0"); in_buffer->end += data_len; } else { errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, left_len); securec_check_c(errorno, "\0", "\0"); errorno = memcpy_s(in_buffer->data, in_buffer->start, &data_in[left_len], (data_len - left_len)); securec_check_c(errorno, "\0", "\0"); in_buffer->end = (data_len - left_len); } } else if (in_buffer->start > in_buffer->end){ /* | data end----start data | */ errorno = memcpy_s(&in_buffer->data[in_buffer->end], in_buffer->start - in_buffer->end, data_in, data_len); securec_check_c(errorno, "\0", "\0"); in_buffer->end += data_len; } else { /* in buffer is empty, reset it. */ errorno = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, in_buffer->max_buffer_size); securec_check(errorno,"\0","\0"); in_buffer->free_space = in_buffer->max_buffer_size; in_buffer->start = 0; in_buffer->end = 0; errorno = memcpy_s(in_buffer->data, in_buffer->max_buffer_size, data_in, data_len); securec_check(errorno,"\0","\0"); in_buffer->end += data_len; } in_buffer->free_space -= data_len; return; } static int32 gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len) { GMS_TCP_CONNECTION_BUFFER *out_buffer = &c_state->out_buffer; errno_t rc = EOK; if (out_buffer->free_space < data_len) { gms_tcp_close_connection_by_state(c_state); ereport(ERROR, (errcode(ERRCODE_BUFFER_TOO_SMALL), errmsg("output bufer is full."))); } rc = memcpy_s(&out_buffer->data[out_buffer->end], out_buffer->max_buffer_size - out_buffer->end, data_out, data_len); securec_check_c(rc, "\0", "\0"); out_buffer->end += data_len; out_buffer->free_space -= data_len; return data_len; } void gms_tcp_release_connection_info(GMS_TCP_CONNECTION_INFO *c_info) { if (c_info->remote_host) { pfree(c_info->remote_host); c_info->remote_host = NULL; } if (c_info->local_host) { pfree(c_info->local_host); c_info->local_host = NULL; } if (c_info->charset) { pfree(c_info->charset); c_info->charset = NULL; } if (c_info->newline) { pfree(c_info->newline); c_info->newline = NULL; } } static void gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer) { if (buffer->data) { pfree(buffer->data); buffer->data = NULL; } buffer->free_space = 0; buffer->start = 0; buffer->end = 0; } static void gms_tcp_release_connection_state(std::shared_ptr data) { GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast(data).get(); if (c_state->fd) { close(c_state->fd); } gms_tcp_release_connection_buffer(&c_state->in_buffer); gms_tcp_release_connection_buffer(&c_state->out_buffer); gms_tcp_release_connection_info(&c_state->c_info); pfree(c_state); } static void gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len) { int32 i = 0; char *newline = NULL; int newline_len = 0; errno_t rc = EOK; Assert(c_state->c_info.newline); if (strcmp(c_state->c_info.newline, "CRLF") == 0) { newline_len = 2; newline = "\r\n"; } else { newline_len = strlen(c_state->c_info.newline); newline = c_state->c_info.newline; } while (i <= data_len - newline_len) { if (strncmp(&data[i], newline, newline_len) == 0) { rc = memset_s(&data[i], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, newline_len); securec_check_c(rc, "\0", "\0"); i += newline_len; continue; } i++; } } static int32 gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) { GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL; errno_t rc = EOK; in_buffer = &c_state->in_buffer; if (in_buffer->end == in_buffer->start) { gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt); } /* |----start data end----| */ if (in_buffer->end > in_buffer->start) { int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->end - in_buffer->start))) ? data_opt->len : (in_buffer->end - in_buffer->start); rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len); securec_check_c(rc, "\0", "\0"); data_opt->data_len = rcv_len; if (!data_opt->peek) { rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len); securec_check_c(rc, "\0", "\0"); in_buffer->start += rcv_len; in_buffer->free_space += rcv_len; } } else { /* | data end----start data | */ int after_start = in_buffer->max_buffer_size - in_buffer->start; int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->max_buffer_size - in_buffer->free_space))) ? data_opt->len : (in_buffer->max_buffer_size - in_buffer->free_space); if (rcv_len <= after_start) { rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len); securec_check_c(rc, "\0", "\0"); if (!data_opt->peek) { rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len); securec_check_c(rc, "\0", "\0"); in_buffer->start = (in_buffer->start + rcv_len) % in_buffer->max_buffer_size; in_buffer->free_space += rcv_len; } } else { int32 left_len = rcv_len - after_start; rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], after_start); securec_check_c(rc, "\0", "\0"); data_opt->data_len += after_start; rc = memcpy_s(&data_opt->data[after_start], GMS_TCP_MAX_IN_BUFFER_SIZE - after_start, in_buffer->data, left_len); securec_check_c(rc, "\0", "\0"); data_opt->data_len += left_len; if (!data_opt->peek) { rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, after_start); securec_check_c(rc, "\0", "\0"); rc = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, left_len); securec_check_c(rc, "\0", "\0"); in_buffer->start = left_len; in_buffer->free_space += rcv_len; } } } if (data_opt->remove_crlf) { gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len); } return data_opt->data_len; } static void gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state) { ListCell *lc = NULL; PortalSpecialData *find = NULL; Assert(c_state); Assert(c_state->state == GMS_TCP_CONNECT_OK); foreach (lc, GMS_TCP_FD_BAK_LIST) { PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); if (c_state == std::static_pointer_cast(node->data).get()) { find = node; break; } } if (find) { GMS_TCP_FD_BAK_LIST = list_delete_ptr(GMS_TCP_FD_BAK_LIST, find); find->cleanup(find->data); pfree(find); } } static int32 gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state) { int32 bytes_to_read = 0; gms_tcp_wait(c_state, false); if (ioctl(c_state->fd, FIONREAD, &bytes_to_read) == 0) { return bytes_to_read; } return 0; } static inline int gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout) { int ret; fd_set rd; struct timeval timeoutval = {timeout, 0}; FD_ZERO(&rd); FD_SET(c_state->fd, &rd); ret = select(c_state->fd + 1, &rd, NULL, NULL, &timeoutval); return ret; } static inline int gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout) { int ret; fd_set wd; struct timeval timeoutval = {timeout, 0}; FD_ZERO(&wd); FD_SET(c_state->fd, &wd); ret = select(c_state->fd + 1, NULL, &wd, NULL, &timeoutval); return ret; } static void gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err) { int32 timeout = 0; if (c_state->c_info.available_wait) { /* recv data by available function. */ if (c_state->c_info.available_timeout) { timeout = c_state->c_info.available_timeout; } } else if (c_state->c_info.get_data_wait) { /* recv data by get/read function. */ if (c_state->c_info.tx_timeout) { timeout = c_state->c_info.tx_timeout; } } if (timeout) { int ret = gms_tcp_set_connection_rcv_timeout(c_state, timeout); if (report_err) { if (ret == 0) { ereport(ERROR, (errcode(ERRCODE_TRANSFER_TIMEOUT), errmsg("recv data timeout."))); } else if (ret < 0) { ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } } } } static void gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) { GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; char *data = NULL; int32 recv_len = 0; Assert(in_buffer->max_buffer_size && in_buffer->data); data = (char *)palloc0(in_buffer->max_buffer_size); gms_tcp_wait(c_state, data_opt->report_err); recv_len = recv(c_state->fd, data, in_buffer->max_buffer_size, MSG_DONTWAIT); if (recv_len > 0) { char *decrypt_data = NULL; if (!data_opt->len) { decrypt_data = gms_tcp_decrypt_data(c_state, data); } if (decrypt_data) { gms_tcp_put_data_to_in_buffer(c_state, decrypt_data, strlen(decrypt_data)); } else { gms_tcp_put_data_to_in_buffer(c_state, data, recv_len); } } else if (data_opt->report_err) { gms_tcp_close_connection_by_state(c_state); pfree(data); ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } pfree(data); return; } static bool gms_tcp_check_charset(char *charset) { const char *client_encoding_name = pg_get_client_encoding_name(); iconv_t cd; cd = iconv_open(charset, client_encoding_name); if (cd == ((iconv_t)(-1))) { ereport(WARNING, (errmsg("Can not change string from %s to %s", client_encoding_name, charset))); return false; } iconv_close(cd); return true; } static inline int32 gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer) { return in_buffer->max_buffer_size - in_buffer->free_space; } static char * gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write) { iconv_t cd; char *data_out = NULL; size_t data_out_len = GMS_TCP_MAX_OUT_BUFFER_SIZE; char *data_out_p = NULL; char *data_in_p = NULL; const char *client_encoding_name = pg_get_client_encoding_name(); size_t len = data_opt->len ? data_opt->len : strlen(data_opt->data); if (gms_tcp_comm_cmp_str(c_state->c_info.charset, client_encoding_name)) { return NULL; } if (is_write) { cd = iconv_open(c_state->c_info.charset, client_encoding_name); } else { cd = iconv_open(client_encoding_name, c_state->c_info.charset); } if (cd < 0) { return NULL; } data_out = (char *)palloc0(data_out_len); data_in_p = data_opt->data; data_out_p = data_out; if(iconv(cd, &data_in_p, &len, &data_out_p, &data_out_len) < 0){ iconv_close(cd); pfree(data_out); return NULL; } iconv_close(cd); return data_out; } static char * gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data) { /* Do not support encrypt now. */ return NULL; } static char * gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data) { /* Do not support decrypt now. */ return NULL; } static void gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) { GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL; errno_t rc = EOK; in_buffer = &c_state->in_buffer; if (in_buffer->max_buffer_size && in_buffer->data) { /* if in buffer is used, get data from in buffer. */ data_opt->data_len = gms_tcp_get_data_from_in_buffer(c_state, data_opt); c_state->c_info.get_data_wait = false; /* recv data from connection to inbuffer no wait. */ data_opt->report_err = false; gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt); } else { if (c_state->c_info.tx_timeout) { int ret = gms_tcp_set_connection_rcv_timeout(c_state, c_state->c_info.tx_timeout); if (ret == 0) { ereport(ERROR, (errcode(ERRCODE_TRANSFER_TIMEOUT), errmsg("recv data timeout."))); } else if (ret < 0) { ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } } data_opt->data_len = recv(c_state->fd, data_opt->data, data_opt->len ? data_opt->len : GMS_TCP_MAX_IN_BUFFER_SIZE, MSG_DONTWAIT | (data_opt->peek ? MSG_PEEK : 0)); if (data_opt->data_len <= 0) { gms_tcp_close_connection_by_state(c_state); ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } /* if the data is encrypted, we must get the hold data. */ if (!data_opt->len) { char *decrypt_data = gms_tcp_decrypt_data(c_state, data_opt->data); if (decrypt_data) { rc = memset_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, GMS_TCP_MAX_IN_BUFFER_SIZE); securec_check_c(rc, "\0", "\0"); rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, decrypt_data, strlen(decrypt_data)); securec_check_c(rc, "\0", "\0"); data_opt->data_len = strlen(decrypt_data); } } if (data_opt->remove_crlf) { gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len); } } } static int32 gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt) { char *data = NULL; int32 data_len = 0; char *encrypt_data = NULL; errno_t rc = EOK; data_len = data_opt->len ? data_opt->len : strlen(data_opt->data); if (c_state->out_buffer.data) { bool done = false; if (data_opt->ch_charset && c_state->c_info.charset) { /* change charset, if data is not NULL, send data, or send data_opt->data. */ data = gms_tcp_encode_data_by_charset(c_state, data_opt, true); if (data) { /* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */ if (data_opt->encrypt) { encrypt_data = gms_tcp_encrypt_data(c_state, data); } if (encrypt_data) { data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data)); } else { data_len = gms_tcp_put_data_to_out_buffer(c_state, data, strlen(data)); } pfree(data); done = true; } } if (!done) { /* * the charset is not change, try encrypt the data, * if encrypt_data is not NULL, send encrypt_data, or send data_opt->data. */ if (data_opt->encrypt) { encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data); } if (encrypt_data) { data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data)); } else { data_len = gms_tcp_put_data_to_out_buffer(c_state, data_opt->data, data_len); } } } else { char *data_t = NULL; bool done = false; if (data_opt->ch_charset && c_state->c_info.charset) { /* change charset, if data is not NULL, send data, or send data_opt->data. */ char *data = gms_tcp_encode_data_by_charset(c_state, data_opt, true); if (data) { /* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */ if (data_opt->encrypt) { encrypt_data = gms_tcp_encrypt_data(c_state, data); } if (encrypt_data) { data_t = (char *)palloc0(strlen(encrypt_data)); rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data)); securec_check_c(rc, "\0", "\0"); data_len = strlen(encrypt_data); } else { data_t = (char *)palloc0(strlen(data)); rc = memcpy_s(data_t, strlen(data), data, strlen(data)); securec_check_c(rc, "\0", "\0"); data_len = strlen(data); } pfree(data); done = true; } } if (!done) { /* * the charset is not change, try encrypt the data, * if encrypt_data is not NULL, send encrypt_data, or send data_opt->data. */ if (data_opt->encrypt) { encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data); } if (encrypt_data) { data_t = (char *)palloc0(strlen(encrypt_data)); rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data)); securec_check_c(rc, "\0", "\0"); data_len = strlen(encrypt_data); } else { data_t = (char *)palloc0(strlen(data_opt->data)); rc = memcpy_s(data_t, strlen(data_opt->data), data_opt->data, strlen(data_opt->data)); securec_check_c(rc, "\0", "\0"); } } if (c_state->c_info.tx_timeout) { int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout); if (ret == 0) { ereport(ERROR, (errcode(ERRCODE_TRANSFER_TIMEOUT), errmsg("recv data timeout."))); } else if (ret < 0) { ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } } data_len = send(c_state->fd, data_t, data_len, MSG_DONTWAIT); pfree(data_t); } PG_RETURN_INT32(data_len); } static void gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state) { if (!c_state) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("get connection fail."))); } else { Assert(c_state->state == GMS_TCP_CONNECT_OK); } } Datum gms_tcp_crlf(PG_FUNCTION_ARGS) { PG_RETURN_TEXT_P(cstring_to_text("\r\n")); } Datum gms_tcp_available_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); int32 available_timeout = PG_GETARG_INT32(1); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer; GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; int32 available = 0; gms_tcp_check_connection_null(c_state); if (in_buffer->max_buffer_size && in_buffer->data) { c_state->c_info.available_wait = true; c_state->c_info.available_timeout = 0; data_opt.report_err = false; gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt); available = gms_tcp_get_in_buffer_data_bytes(in_buffer); if (!available && available_timeout) { c_state->c_info.available_timeout = available_timeout; gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt); available = gms_tcp_get_in_buffer_data_bytes(in_buffer); } } else { c_state->c_info.available_wait = true; c_state->c_info.available_timeout = available_timeout; available = gms_tcp_get_available_bytes(c_state); } c_state->c_info.available_wait = false; c_state->c_info.available_timeout = 0; PG_RETURN_INT32(available); } Datum gms_tcp_flush(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_BUFFER *out_buffer = NULL; errno_t rc = EOK; gms_tcp_check_connection_null(c_state); out_buffer = &c_state->out_buffer; if (!out_buffer->data || !out_buffer->max_buffer_size) { PG_RETURN_VOID(); } if (c_state->c_info.tx_timeout) { int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout); if (ret == 0) { ereport(ERROR, (errcode(ERRCODE_TRANSFER_TIMEOUT), errmsg("recv data timeout."))); } else if (ret < 0) { ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("recv data error."))); } } send(c_state->fd, out_buffer->data, out_buffer->end, MSG_DONTWAIT); rc = memset_s(out_buffer->data, GMS_TCP_MAX_OUT_BUFFER_SIZE, 0, out_buffer->max_buffer_size); securec_check_c(rc, "\0", "\0"); out_buffer->start = 0; out_buffer->end = 0; out_buffer->free_space = out_buffer->max_buffer_size; PG_RETURN_VOID(); } Datum gms_tcp_close_all_connections(PG_FUNCTION_ARGS) { ListCell* lc = NULL; if (!GMS_TCP_FD_BAK_LIST) { PG_RETURN_VOID(); } foreach (lc, GMS_TCP_FD_BAK_LIST) { PortalSpecialData *node = (PortalSpecialData *)lfirst(lc); node->cleanup(node->data); pfree(node); } list_free_ext(GMS_TCP_FD_BAK_LIST); PG_RETURN_VOID(); } Datum gms_tcp_close_connection(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); gms_tcp_check_connection_null(c_state); gms_tcp_close_connection_by_state(c_state); PG_RETURN_VOID(); } Datum gms_tcp_get_line_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); bool remove_crlf = PG_GETARG_BOOL(1); bool peek = PG_GETARG_BOOL(2); bool ch_charset = PG_GETARG_BOOL(3); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; gms_tcp_check_connection_null(c_state); data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); data_opt.remove_crlf = remove_crlf; data_opt.peek = peek; data_opt.report_err = true; c_state->c_info.get_data_wait = true; gms_tcp_get_data(c_state, &data_opt); c_state->c_info.get_data_wait = false; if (ch_charset && c_state->c_info.charset) { char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false); if (data) { pfree(data_opt.data); PG_RETURN_TEXT_P(cstring_to_text(data)); } } PG_RETURN_TEXT_P(cstring_to_text(data_opt.data)); } Datum gms_tcp_get_raw_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); int32 len = PG_GETARG_INT32(1); bool peek = PG_GETARG_BOOL(2); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; StringInfoData buf; gms_tcp_check_connection_null(c_state); data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); data_opt.len = len; data_opt.peek = peek; data_opt.report_err = true; c_state->c_info.get_data_wait = true; gms_tcp_get_data(c_state, &data_opt); c_state->c_info.get_data_wait = false; pq_begintypsend(&buf); pq_sendbytes(&buf, data_opt.data, data_opt.data_len); PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); } Datum gms_tcp_get_text_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); int32 len = PG_GETARG_INT32(1); bool peek = PG_GETARG_BOOL(2); bool ch_charset = PG_GETARG_BOOL(3); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; gms_tcp_check_connection_null(c_state); data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE); data_opt.len = len; data_opt.peek = peek; data_opt.ch_charset = ch_charset; data_opt.report_err = true; c_state->c_info.get_data_wait = true; gms_tcp_get_data(c_state, &data_opt); c_state->c_info.get_data_wait = false; if (ch_charset && c_state->c_info.charset) { char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false); if (data) { pfree(data_opt.data); PG_RETURN_TEXT_P(cstring_to_text(data)); } } PG_RETURN_TEXT_P(cstring_to_text(data_opt.data)); } Datum gms_tcp_open_connection(PG_FUNCTION_ARGS) { text *remote_host_t = PG_GETARG_TEXT_P(0); char *remote_host = NULL; struct sockaddr_in saddr; int remote_port = PG_GETARG_INT32(1); text *local_host_t = PG_GETARG_TEXT_P(2); char *local_host = NULL; int local_port = PG_GETARG_INT32(3); int in_buffer_size = PG_GETARG_INT32(4); int out_buffer_size = PG_GETARG_INT32(5); text *charset_t = PG_GETARG_TEXT_P(6); char *charset = NULL; text *newline_t = PG_GETARG_TEXT_P(7); char *newline = NULL; int tx_timeout = PG_GETARG_INT32(8); GMS_TCP_CONNECTION_HEAD *c_h = NULL; GMS_TCP_CONNECTION *c = NULL; GMS_TCP_CONNECTION_STATE *c_state = NULL; int offset = 0; c = (GMS_TCP_CONNECTION *)palloc0(sizeof(GMS_TCP_CONNECTION)); c_h = &c->c_h; if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(remote_host_t, &remote_host, GMS_TCP_MAX_HOST_LEN, &c_h->remote_host_len, &c_h->total_len)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input remote host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN))); } if (!remote_host) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input error remote host."))); } if (!gms_tcp_change_remote_host(remote_host, &saddr)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input remote host error."))); } if (remote_port) { c_h->remote_port_len = sizeof(c_h->remote_port_len); c_h->total_len += c_h->remote_port_len; } else { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input remote port error."))); } if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(local_host_t, &local_host, GMS_TCP_MAX_HOST_LEN, &c_h->local_host_len, &c_h->total_len)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input local host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN))); } if (local_port) { c_h->local_port_len = sizeof(c_h->local_port_len); c_h->total_len += c_h->local_port_len; } if (in_buffer_size > GMS_TCP_MAX_IN_BUFFER_SIZE) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_IN_BUFFER_SIZE))); } else if (in_buffer_size) { c_h->in_buffer_size_len = sizeof(c_h->in_buffer_size_len); c_h->total_len += c_h->in_buffer_size_len; } if (out_buffer_size > GMS_TCP_MAX_OUT_BUFFER_SIZE) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_OUT_BUFFER_SIZE))); } else if (out_buffer_size) { c_h->out_buffer_size_len = sizeof(c_h->out_buffer_size_len); c_h->total_len += c_h->out_buffer_size_len; } if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(charset_t, &charset, GMS_TCP_MAX_CHARSET_LEN, &c_h->charset_len, &c_h->total_len)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input charset too long, max length is %d.", GMS_TCP_MAX_CHARSET_LEN))); } if (charset && !gms_tcp_check_charset(charset)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("error charset: %s.", charset))); } if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(newline_t, &newline, 16, &c_h->newline_len, &c_h->total_len)) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("input newline too long, max length is %d.", GMS_TCP_MAX_NEWLINE_LEN))); } if (tx_timeout > GMS_TCP_MAX_TX_TIMEOUT || tx_timeout < 0) { pfree(c); ereport(ERROR, (errcode(ERRCODE_BAD_ARGUMENT), errmsg("tx timeout must be limited in %d ~ %d.", 0, GMS_TCP_MAX_TX_TIMEOUT))); } else if (tx_timeout) { c_h->tx_timeout_len = sizeof(c_h->tx_timeout_len); c_h->total_len += c_h->tx_timeout_len; } Assert(c_h->total_len); gms_tcp_comm_memcopy_str(c->data + offset, c_h->remote_host_len, remote_host, &offset); gms_tcp_comm_memcopy_int(c->data + offset, c_h->remote_port_len, (char *)&remote_port, c_h->remote_port_len, &offset); gms_tcp_comm_memcopy_str(c->data + offset, c_h->local_host_len, local_host, &offset); gms_tcp_comm_memcopy_int(c->data + offset, c_h->local_port_len, (char *)&local_port, c_h->local_port_len, &offset); gms_tcp_comm_memcopy_int(c->data + offset, c_h->in_buffer_size_len, (char *)&in_buffer_size, c_h->in_buffer_size_len, &offset); gms_tcp_comm_memcopy_int(c->data + offset, c_h->out_buffer_size_len, (char *)&out_buffer_size, c_h->out_buffer_size_len, &offset); gms_tcp_comm_memcopy_str(c->data + offset, c_h->charset_len, charset, &offset); gms_tcp_comm_memcopy_str(c->data + offset, c_h->newline_len, newline, &offset); gms_tcp_comm_memcopy_int(c->data + offset, c_h->tx_timeout_len, (char *)&tx_timeout, c_h->tx_timeout_len, &offset); c_state = (GMS_TCP_CONNECTION_STATE *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_STATE)); gms_tcp_get_connection_info(c, &c_state->c_info); c_state->c_info.saddr = saddr; gms_tcp_init_data_buffer(&c_state->in_buffer, in_buffer_size); gms_tcp_init_data_buffer(&c_state->out_buffer, out_buffer_size); if (gms_tcp_connect(c_state) != GMS_TCP_OK) { std::shared_ptr data_cstate = std::shared_ptr(c_state); gms_tcp_release_connection_state(data_cstate); pfree(c); ereport(ERROR, (errcode(ERRCODE_NETWORK_ERROR), errmsg("connect to remote(remote host: %s, remote port: %d) fail.", remote_host, remote_port))); } c_h->fd_len = sizeof(int); gms_tcp_comm_memcopy_int(c->data + offset, c_h->fd_len, (char *)&c_state->fd, c_h->fd_len, &offset); c_h->total_len += c_h->fd_len; c_state->c_info.fd = c_state->fd; gms_tcp_store_connection(c_state); PG_RETURN_POINTER(c); } Datum gms_tcp_write_line(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); text *data_t = PG_GETARG_TEXT_P(1); char *data = text_to_cstring(data_t); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); char *data_out = NULL; char *newline = NULL; int newline_len = 0; int32 len = 0; GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; errno_t rc = EOK; gms_tcp_check_connection_null(c_state); data_out = (char *)palloc0(GMS_TCP_MAX_OUT_BUFFER_SIZE); Assert(c_state->c_info.newline); if (strcmp(c_state->c_info.newline, "CRLF") == 0) { newline_len = 2; newline = "\r\n"; } else { newline_len = 1; newline = "\n"; } rc = memcpy_s(data_out, GMS_TCP_MAX_OUT_BUFFER_SIZE, data, strlen(data)); securec_check_c(rc, "\0", "\0"); data_opt.data = data_out; data_opt.data_len = strlen(data_out); rc = memcpy_s(&data_out[data_opt.data_len], GMS_TCP_MAX_OUT_BUFFER_SIZE - data_opt.data_len, newline, newline_len); securec_check_c(rc, "\0", "\0"); data_opt.data_len += newline_len; data_opt.ch_charset = true; data_opt.encrypt = true; len = gms_tcp_write_data(c_state, &data_opt); PG_RETURN_INT32(len); } Datum gms_tcp_write_raw_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); bytea *wbuf = PG_GETARG_BYTEA_P(1); int32 write_len = PG_GETARG_INT32(2); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; int32 len = 0; gms_tcp_check_connection_null(c_state); data_opt.data = VARDATA(wbuf); data_opt.data_len = VARSIZE(wbuf) - VARHDRSZ; data_opt.len = write_len; len = gms_tcp_write_data(c_state, &data_opt); PG_RETURN_INT32(len); } Datum gms_tcp_write_text_real(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); text *data_t = PG_GETARG_TEXT_P(1); int32 write_len = PG_GETARG_INT32(2); char *data = text_to_cstring(data_t); GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c); int32 len = 0; GMS_TCP_CONNECTION_DATA_OPT data_opt = {0}; gms_tcp_check_connection_null(c_state); data_opt.data = data; data_opt.data_len = strlen(data); data_opt.len = write_len; data_opt.ch_charset = true; data_opt.encrypt = false; len = gms_tcp_write_data(c_state, &data_opt); PG_RETURN_INT32(len); } Datum gms_tcp_connection_in(PG_FUNCTION_ARGS) { PG_RETURN_NULL(); } Datum gms_tcp_connection_out(PG_FUNCTION_ARGS) { GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0); GMS_TCP_CONNECTION_INFO *c_info = NULL; StringInfoData str; if(!c) { PG_RETURN_NULL(); } c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO)); gms_tcp_get_connection_info(c, c_info); initStringInfo(&str); if (c_info->remote_host) { appendStringInfo(&str, "remote host:"); appendStringInfo(&str, "%s", c_info->remote_host); } if (c_info->remote_port) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "remote port:"); appendStringInfo(&str, "%d", c_info->remote_port); } if (c_info->local_host) { appendStringInfo(&str, "local host:"); appendStringInfo(&str, "%s", c_info->local_host); } if (c_info->local_port) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "local port:"); appendStringInfo(&str, "%d", c_info->local_port); } if (c_info->in_buffer_size) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "in buffer size:"); appendStringInfo(&str, "%d", c_info->in_buffer_size); } if (c_info->out_buffer_size) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "out buffer size:"); appendStringInfo(&str, "%d", c_info->out_buffer_size); } if (c_info->charset) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "charset:"); appendStringInfo(&str, "%s", c_info->charset); } if (c_info->newline && (strcmp(c_info->newline, "CRLF") != 0)) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "newline:"); appendStringInfo(&str, "%s", c_info->newline); } if (c_info->tx_timeout && (c_info->tx_timeout != GMS_TCP_MAX_TX_TIMEOUT)) { appendStringInfoChar(&str, ','); appendStringInfo(&str, "tx timeout:"); appendStringInfo(&str, "%d", c_info->tx_timeout); } gms_tcp_release_connection_info(c_info); pfree(c_info); PG_RETURN_CSTRING(str.data); } static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset) { errno_t rc = EOK; if (src) { int len = 0; int src_len = strlen(src); rc = memcpy_s(dst, dst_len, src, src_len); securec_check_c(rc, "\0", "\0"); len += src_len; dst[src_len] = '\0'; len++; if (offset) { *offset += len; } } } static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset) { errno_t rc = EOK; if (src_len) { rc = memcpy_s(dst, dst_len, src, src_len); securec_check_c(rc, "\0", "\0"); if (offset) { *offset += src_len; } } } static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len) { if (data_t) { char *p = text_to_cstring(data_t); int data_len = strlen(p) + 1; if (data_len >= data_len_max) { return ORAFCE_COMM_STRING_TOO_LONG; } if (!(strlen(p) == 1 && p[0] == '0')) { *get_data_len = data_len; *total_len += data_len; *data = p; } } return 0; } static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2) { if ((str_1 && str_2) && (strlen(str_1) == strlen(str_2)) && (memcmp(str_1, str_2, strlen(str_1)) == 0)) { return true; } else if (!str_1 && !str_2) { return true; } else { return false; } }