Files
openGauss-server/contrib/gms_tcp/gms_tcp.cpp
2024-11-03 18:23:03 -08:00

1500 lines
50 KiB
C++

/*
* This code implements the functons of gms_tcp
*/
#include "gms_tcp.h"
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(gms_tcp_crlf);
PG_FUNCTION_INFO_V1(gms_tcp_available_real);
PG_FUNCTION_INFO_V1(gms_tcp_close_all_connections);
PG_FUNCTION_INFO_V1(gms_tcp_close_connection);
PG_FUNCTION_INFO_V1(gms_tcp_flush);
PG_FUNCTION_INFO_V1(gms_tcp_get_line_real);
PG_FUNCTION_INFO_V1(gms_tcp_get_raw_real);
PG_FUNCTION_INFO_V1(gms_tcp_get_text_real);
PG_FUNCTION_INFO_V1(gms_tcp_open_connection);
PG_FUNCTION_INFO_V1(gms_tcp_write_line);
PG_FUNCTION_INFO_V1(gms_tcp_write_raw_real);
PG_FUNCTION_INFO_V1(gms_tcp_write_text_real);
PG_FUNCTION_INFO_V1(gms_tcp_connection_in);
PG_FUNCTION_INFO_V1(gms_tcp_connection_out);
static void *gms_tcp_alloc(int len);
static inline List *gms_tcp_lappend(List* list, void* datum);
static inline void gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size);
static bool gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr);
static bool gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr);
static int gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state);
static void gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state);
static GMS_TCP_CONNECTION_STATE *gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c);
static void gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len);
static int32 gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len);
static void gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer);
static void gms_tcp_release_connection_state(std::shared_ptr<void> data);
static void gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len);
static int32 gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt);
static void gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state);
static int32 gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state);
static inline int gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout);
static inline int gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout);
static void gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err);
static void gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt);
static bool gms_tcp_check_charset(char *charset);
static inline int32 gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer);
static char *gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write);
static char *gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data);
static char *gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data);
static void gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt);
static int32 gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt);
static void gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state);
static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset);
static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset);
static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len);
static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2);
static void *
gms_tcp_alloc(int len)
{
void *result = NULL;
MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt);
result = palloc0(len);
MemoryContextSwitchTo(old_context);
return result;
}
static inline List *
gms_tcp_lappend(List* list, void* datum)
{
List *result = NULL;
MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt);
result = lappend(list, datum);
MemoryContextSwitchTo(old_context);
return result;
}
static inline List *
gms_tcp_lappend_int(List* list, int datum)
{
List *result = NULL;
MemoryContext old_context = MemoryContextSwitchTo(t_thrd.mem_cxt.portal_mem_cxt);
result = lappend_int(list, datum);
MemoryContextSwitchTo(old_context);
return result;
}
void
gms_tcp_get_connection_info(GMS_TCP_CONNECTION *c, GMS_TCP_CONNECTION_INFO *c_info)
{
GMS_TCP_CONNECTION_HEAD *c_h = &c->c_h;
int offset = 0;
if (c_h->remote_host_len) {
c_info->remote_host = (char *)gms_tcp_alloc(c_h->remote_host_len);
gms_tcp_comm_memcopy_str(c_info->remote_host, c_h->remote_host_len, c->data + offset, &offset);
}
if (c_h->remote_port_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->remote_port, c_h->remote_port_len, c->data + offset, c_h->remote_port_len, &offset);
}
if (c_h->local_host_len) {
c_info->local_host = (char *)gms_tcp_alloc(c_h->local_host_len);
gms_tcp_comm_memcopy_str(c_info->local_host, c_h->local_host_len, c->data + offset, &offset);
}
if (c_h->local_port_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->local_port, c_h->local_port_len, c->data + offset, c_h->local_port_len, &offset);
}
if (c_h->in_buffer_size_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->in_buffer_size, c_h->in_buffer_size_len, c->data + offset, c_h->in_buffer_size_len, &offset);
}
if (c_h->out_buffer_size_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->out_buffer_size, c_h->out_buffer_size_len, c->data + offset, c_h->out_buffer_size_len, &offset);
}
if (c_h->charset_len) {
c_info->charset = (char *)gms_tcp_alloc(c_h->charset_len);
gms_tcp_comm_memcopy_str(c_info->charset, c_h->charset_len, c->data + offset, &offset);
}
if (c_h->newline_len) {
c_info->newline = (char *)gms_tcp_alloc(c_h->newline_len);
gms_tcp_comm_memcopy_str(c_info->newline, c_h->newline_len, c->data + offset, &offset);
}
if (c_h->tx_timeout_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->tx_timeout, c_h->tx_timeout_len, c->data + offset, c_h->tx_timeout_len, &offset);
}
if (c_h->fd_len) {
gms_tcp_comm_memcopy_int((char *)&c_info->fd, c_h->fd_len, c->data + offset, c_h->fd_len, &offset);
}
return;
}
void
gms_tcp_get_connection_head_by_info(GMS_TCP_CONNECTION_INFO *c_info, GMS_TCP_CONNECTION_HEAD *c_h)
{
if (c_info->remote_host) {
c_h->remote_host_len = strlen(c_info->remote_host) + 1;
c_h->total_len += c_h->remote_host_len;
}
if (c_info->remote_port) {
c_h->remote_port_len = sizeof(c_info->remote_port);
c_h->total_len += c_h->remote_port_len;
}
if (c_info->local_host) {
c_h->local_host_len = strlen(c_info->local_host) + 1;
c_h->total_len += c_h->local_host_len;
}
if (c_info->local_port) {
c_h->local_port_len = sizeof(c_info->local_port);
c_h->total_len += c_h->local_port_len;
}
if (c_info->in_buffer_size) {
c_h->in_buffer_size_len = sizeof(c_info->in_buffer_size);
c_h->total_len += c_h->in_buffer_size_len;
}
if (c_info->out_buffer_size) {
c_h->out_buffer_size_len = sizeof(c_info->out_buffer_size);
c_h->total_len += c_h->out_buffer_size_len;
}
if (c_info->charset) {
c_h->charset_len = strlen(c_info->charset) + 1;
c_h->total_len += c_h->charset_len;
}
if (c_info->newline) {
c_h->newline_len = strlen(c_info->newline) + 1;
c_h->total_len += c_h->newline_len;
}
if (c_info->tx_timeout) {
c_h->tx_timeout_len = sizeof(c_info->tx_timeout);
c_h->total_len += c_h->tx_timeout_len;
}
if (c_info->fd) {
c_h->fd_len = sizeof(c_info->fd);
c_h->total_len += c_h->fd_len;
}
return;
}
static inline void
gms_tcp_init_data_buffer(GMS_TCP_CONNECTION_BUFFER *c_buffer, int32 buffer_size)
{
errno_t rc = EOK;
rc = memset_s(c_buffer, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, sizeof(GMS_TCP_CONNECTION_BUFFER));
securec_check_c(rc, "\0", "\0");
if (buffer_size) {
c_buffer->max_buffer_size = buffer_size;
c_buffer->free_space = buffer_size;
c_buffer->data = (char *)gms_tcp_alloc(buffer_size);
}
}
static bool
gms_tcp_get_addr_by_hostname(char *remotehost, struct sockaddr_in *saddr)
{
struct hostent *hptr = gethostbyname(remotehost);
if (!hptr) {
return false;
}
if (hptr->h_addrtype == AF_INET) {
saddr->sin_addr = *((struct in_addr *)hptr->h_addr_list[0]);
return true;
}
return false;
}
static bool
gms_tcp_change_remote_host(char *remotehost, struct sockaddr_in *saddr)
{
int ret = 0;
ret = inet_aton(remotehost, &saddr->sin_addr);
if (!ret) {
return gms_tcp_get_addr_by_hostname(remotehost, saddr);
}
return true;
}
static int
gms_tcp_connect(GMS_TCP_CONNECTION_STATE *c_state)
{
c_state->fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in saddr;
int res = 0;
errno_t rc = EOK;
rc = memset_s(&saddr, GMS_TCP_MAX_IN_BUFFER_SIZE, 0,sizeof(saddr));
securec_check_c(rc, "\0", "\0");
saddr.sin_family = AF_INET;
saddr.sin_port = htons(c_state->c_info.remote_port);
saddr.sin_addr = c_state->c_info.saddr.sin_addr;
res = connect(c_state->fd, (struct sockaddr *)&saddr, sizeof(saddr));
if (res) {
c_state->fd = 0;
return GMS_TCP_CONNECT_FAIL;
}
c_state->state = GMS_TCP_CONNECT_OK;
return GMS_TCP_OK;
}
static void
gms_tcp_store_connection(GMS_TCP_CONNECTION_STATE *c_state)
{
PortalSpecialData *node;
node = (PortalSpecialData *)gms_tcp_alloc(sizeof(PortalSpecialData));
node->data = std::shared_ptr<void>(c_state);
node->cleanup = gms_tcp_release_connection_state;
GMS_TCP_FD_BAK_LIST = gms_tcp_lappend(GMS_TCP_FD_BAK_LIST, node);
}
static GMS_TCP_CONNECTION_STATE *
gms_tcp_get_connection_state(GMS_TCP_CONNECTION *c)
{
ListCell *lc = NULL;
GMS_TCP_CONNECTION_INFO *c_info = NULL;
GMS_TCP_CONNECTION_STATE *result = NULL;
c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO));
gms_tcp_get_connection_info(c, c_info);
foreach (lc, GMS_TCP_FD_BAK_LIST) {
PortalSpecialData *node = (PortalSpecialData *)lfirst(lc);
GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast<GMS_TCP_CONNECTION_STATE>(node->data).get();
if (c_info->fd == c_state->fd) {
result = c_state;
break;
}
}
gms_tcp_release_connection_info(c_info);
pfree(c_info);
return result;
}
GMS_TCP_CONNECTION_STATE *
gms_tcp_get_connection_state_by_fd(int fd)
{
ListCell *lc = NULL;
GMS_TCP_CONNECTION_STATE *result = NULL;
foreach (lc, GMS_TCP_FD_BAK_LIST) {
PortalSpecialData *node = (PortalSpecialData *)lfirst(lc);
GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast<GMS_TCP_CONNECTION_STATE>(node->data).get();
if (fd == c_state->fd) {
result = c_state;
break;
}
}
return result;
}
static void
gms_tcp_put_data_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_in, int32 data_len)
{
GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer;
int32 left_len = 0;
error_t errorno = EOK;
if (in_buffer->free_space <= data_len) {
gms_tcp_close_connection_by_state(c_state);
ereport(ERROR,
(errcode(ERRCODE_BUFFER_TOO_SMALL),
errmsg("input bufer is full.")));
}
if (in_buffer->end > in_buffer->start) {
/* |----start data end----| */
left_len = in_buffer->max_buffer_size - in_buffer->end;
if (left_len > data_len) {
errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, data_len);
securec_check_c(errorno, "\0", "\0");
in_buffer->end += data_len;
} else {
errorno = memcpy_s(&in_buffer->data[in_buffer->end], left_len, data_in, left_len);
securec_check_c(errorno, "\0", "\0");
errorno = memcpy_s(in_buffer->data, in_buffer->start, &data_in[left_len], (data_len - left_len));
securec_check_c(errorno, "\0", "\0");
in_buffer->end = (data_len - left_len);
}
} else if (in_buffer->start > in_buffer->end){
/* | data end----start data | */
errorno = memcpy_s(&in_buffer->data[in_buffer->end], in_buffer->start - in_buffer->end, data_in, data_len);
securec_check_c(errorno, "\0", "\0");
in_buffer->end += data_len;
} else {
/* in buffer is empty, reset it. */
errorno = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, in_buffer->max_buffer_size);
securec_check(errorno,"\0","\0");
in_buffer->free_space = in_buffer->max_buffer_size;
in_buffer->start = 0;
in_buffer->end = 0;
errorno = memcpy_s(in_buffer->data, in_buffer->max_buffer_size, data_in, data_len);
securec_check(errorno,"\0","\0");
in_buffer->end += data_len;
}
in_buffer->free_space -= data_len;
return;
}
static int32
gms_tcp_put_data_to_out_buffer(GMS_TCP_CONNECTION_STATE *c_state, char *data_out, int32 data_len)
{
GMS_TCP_CONNECTION_BUFFER *out_buffer = &c_state->out_buffer;
errno_t rc = EOK;
if (out_buffer->free_space < data_len) {
gms_tcp_close_connection_by_state(c_state);
ereport(ERROR,
(errcode(ERRCODE_BUFFER_TOO_SMALL),
errmsg("output bufer is full.")));
}
rc = memcpy_s(&out_buffer->data[out_buffer->end], out_buffer->max_buffer_size - out_buffer->end, data_out, data_len);
securec_check_c(rc, "\0", "\0");
out_buffer->end += data_len;
out_buffer->free_space -= data_len;
return data_len;
}
void
gms_tcp_release_connection_info(GMS_TCP_CONNECTION_INFO *c_info)
{
if (c_info->remote_host) {
pfree(c_info->remote_host);
c_info->remote_host = NULL;
}
if (c_info->local_host) {
pfree(c_info->local_host);
c_info->local_host = NULL;
}
if (c_info->charset) {
pfree(c_info->charset);
c_info->charset = NULL;
}
if (c_info->newline) {
pfree(c_info->newline);
c_info->newline = NULL;
}
}
static void
gms_tcp_release_connection_buffer(GMS_TCP_CONNECTION_BUFFER *buffer)
{
if (buffer->data) {
pfree(buffer->data);
buffer->data = NULL;
}
buffer->free_space = 0;
buffer->start = 0;
buffer->end = 0;
}
static void
gms_tcp_release_connection_state(std::shared_ptr<void> data)
{
GMS_TCP_CONNECTION_STATE *c_state = std::static_pointer_cast<GMS_TCP_CONNECTION_STATE>(data).get();
if (c_state->fd) {
close(c_state->fd);
}
gms_tcp_release_connection_buffer(&c_state->in_buffer);
gms_tcp_release_connection_buffer(&c_state->out_buffer);
gms_tcp_release_connection_info(&c_state->c_info);
pfree(c_state);
}
static void
gms_tcp_remove_newline(GMS_TCP_CONNECTION_STATE *c_state, char *data, int32 data_len)
{
int32 i = 0;
char *newline = NULL;
int newline_len = 0;
errno_t rc = EOK;
Assert(c_state->c_info.newline);
if (strcmp(c_state->c_info.newline, "CRLF") == 0) {
newline_len = 2;
newline = "\r\n";
} else {
newline_len = strlen(c_state->c_info.newline);
newline = c_state->c_info.newline;
}
while (i <= data_len - newline_len) {
if (strncmp(&data[i], newline, newline_len) == 0) {
rc = memset_s(&data[i], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, newline_len);
securec_check_c(rc, "\0", "\0");
i += newline_len;
continue;
}
i++;
}
}
static int32
gms_tcp_get_data_from_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt)
{
GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL;
errno_t rc = EOK;
in_buffer = &c_state->in_buffer;
if (in_buffer->end == in_buffer->start) {
gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt);
}
/* |----start data end----| */
if (in_buffer->end > in_buffer->start) {
int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->end - in_buffer->start))) ?
data_opt->len :
(in_buffer->end - in_buffer->start);
rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len);
securec_check_c(rc, "\0", "\0");
data_opt->data_len = rcv_len;
if (!data_opt->peek) {
rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len);
securec_check_c(rc, "\0", "\0");
in_buffer->start += rcv_len;
in_buffer->free_space += rcv_len;
}
} else {
/* | data end----start data | */
int after_start = in_buffer->max_buffer_size - in_buffer->start;
int32 rcv_len = (data_opt->len && (data_opt->len < (in_buffer->max_buffer_size - in_buffer->free_space))) ?
data_opt->len :
(in_buffer->max_buffer_size - in_buffer->free_space);
if (rcv_len <= after_start) {
rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], rcv_len);
securec_check_c(rc, "\0", "\0");
if (!data_opt->peek) {
rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, rcv_len);
securec_check_c(rc, "\0", "\0");
in_buffer->start = (in_buffer->start + rcv_len) % in_buffer->max_buffer_size;
in_buffer->free_space += rcv_len;
}
} else {
int32 left_len = rcv_len - after_start;
rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, &in_buffer->data[in_buffer->start], after_start);
securec_check_c(rc, "\0", "\0");
data_opt->data_len += after_start;
rc = memcpy_s(&data_opt->data[after_start], GMS_TCP_MAX_IN_BUFFER_SIZE - after_start, in_buffer->data, left_len);
securec_check_c(rc, "\0", "\0");
data_opt->data_len += left_len;
if (!data_opt->peek) {
rc = memset_s(&in_buffer->data[in_buffer->start], GMS_TCP_MAX_IN_BUFFER_SIZE, 0, after_start);
securec_check_c(rc, "\0", "\0");
rc = memset_s(in_buffer->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, left_len);
securec_check_c(rc, "\0", "\0");
in_buffer->start = left_len;
in_buffer->free_space += rcv_len;
}
}
}
if (data_opt->remove_crlf) {
gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len);
}
return data_opt->data_len;
}
static void
gms_tcp_close_connection_by_state(GMS_TCP_CONNECTION_STATE *c_state)
{
ListCell *lc = NULL;
PortalSpecialData *find = NULL;
Assert(c_state);
Assert(c_state->state == GMS_TCP_CONNECT_OK);
foreach (lc, GMS_TCP_FD_BAK_LIST) {
PortalSpecialData *node = (PortalSpecialData *)lfirst(lc);
if (c_state == std::static_pointer_cast<GMS_TCP_CONNECTION_STATE>(node->data).get()) {
find = node;
break;
}
}
if (find) {
GMS_TCP_FD_BAK_LIST = list_delete_ptr(GMS_TCP_FD_BAK_LIST, find);
find->cleanup(find->data);
pfree(find);
}
}
static int32
gms_tcp_get_available_bytes(GMS_TCP_CONNECTION_STATE *c_state)
{
int32 bytes_to_read = 0;
gms_tcp_wait(c_state, false);
if (ioctl(c_state->fd, FIONREAD, &bytes_to_read) == 0) {
return bytes_to_read;
}
return 0;
}
static inline int
gms_tcp_set_connection_rcv_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout)
{
int ret;
fd_set rd;
struct timeval timeoutval = {timeout, 0};
FD_ZERO(&rd);
FD_SET(c_state->fd, &rd);
ret = select(c_state->fd + 1, &rd, NULL, NULL, &timeoutval);
return ret;
}
static inline int
gms_tcp_set_connection_send_timeout(GMS_TCP_CONNECTION_STATE *c_state, int32 timeout)
{
int ret;
fd_set wd;
struct timeval timeoutval = {timeout, 0};
FD_ZERO(&wd);
FD_SET(c_state->fd, &wd);
ret = select(c_state->fd + 1, NULL, &wd, NULL, &timeoutval);
return ret;
}
static void
gms_tcp_wait(GMS_TCP_CONNECTION_STATE *c_state, bool report_err)
{
int32 timeout = 0;
if (c_state->c_info.available_wait) {
/* recv data by available function. */
if (c_state->c_info.available_timeout) {
timeout = c_state->c_info.available_timeout;
}
} else if (c_state->c_info.get_data_wait) {
/* recv data by get/read function. */
if (c_state->c_info.tx_timeout) {
timeout = c_state->c_info.tx_timeout;
}
}
if (timeout) {
int ret = gms_tcp_set_connection_rcv_timeout(c_state, timeout);
if (report_err) {
if (ret == 0) {
ereport(ERROR,
(errcode(ERRCODE_TRANSFER_TIMEOUT),
errmsg("recv data timeout.")));
} else if (ret < 0) {
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
}
}
}
static void
gms_tcp_get_data_from_connection_to_in_buffer(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt)
{
GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer;
char *data = NULL;
int32 recv_len = 0;
Assert(in_buffer->max_buffer_size && in_buffer->data);
data = (char *)palloc0(in_buffer->max_buffer_size);
gms_tcp_wait(c_state, data_opt->report_err);
recv_len = recv(c_state->fd, data, in_buffer->max_buffer_size, MSG_DONTWAIT);
if (recv_len > 0) {
char *decrypt_data = NULL;
if (!data_opt->len) {
decrypt_data = gms_tcp_decrypt_data(c_state, data);
}
if (decrypt_data) {
gms_tcp_put_data_to_in_buffer(c_state, decrypt_data, strlen(decrypt_data));
} else {
gms_tcp_put_data_to_in_buffer(c_state, data, recv_len);
}
} else if (data_opt->report_err) {
gms_tcp_close_connection_by_state(c_state);
pfree(data);
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
pfree(data);
return;
}
static bool
gms_tcp_check_charset(char *charset)
{
const char *client_encoding_name = pg_get_client_encoding_name();
iconv_t cd;
cd = iconv_open(charset, client_encoding_name);
if (cd == ((iconv_t)(-1))) {
ereport(WARNING,
(errmsg("Can not change string from %s to %s", client_encoding_name, charset)));
return false;
}
iconv_close(cd);
return true;
}
static inline int32
gms_tcp_get_in_buffer_data_bytes(GMS_TCP_CONNECTION_BUFFER *in_buffer)
{
return in_buffer->max_buffer_size - in_buffer->free_space;
}
static char *
gms_tcp_encode_data_by_charset(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt, bool is_write)
{
iconv_t cd;
char *data_out = NULL;
size_t data_out_len = GMS_TCP_MAX_OUT_BUFFER_SIZE;
char *data_out_p = NULL;
char *data_in_p = NULL;
const char *client_encoding_name = pg_get_client_encoding_name();
size_t len = data_opt->len ? data_opt->len : strlen(data_opt->data);
if (gms_tcp_comm_cmp_str(c_state->c_info.charset, client_encoding_name)) {
return NULL;
}
if (is_write) {
cd = iconv_open(c_state->c_info.charset, client_encoding_name);
} else {
cd = iconv_open(client_encoding_name, c_state->c_info.charset);
}
if (cd < 0) {
return NULL;
}
data_out = (char *)palloc0(data_out_len);
data_in_p = data_opt->data;
data_out_p = data_out;
if(iconv(cd, &data_in_p, &len, &data_out_p, &data_out_len) < 0){
iconv_close(cd);
pfree(data_out);
return NULL;
}
iconv_close(cd);
return data_out;
}
static char *
gms_tcp_encrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data)
{
/* Do not support encrypt now. */
return NULL;
}
static char *
gms_tcp_decrypt_data(GMS_TCP_CONNECTION_STATE *c_state, char *data)
{
/* Do not support decrypt now. */
return NULL;
}
static void
gms_tcp_get_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt)
{
GMS_TCP_CONNECTION_BUFFER *in_buffer = NULL;
errno_t rc = EOK;
in_buffer = &c_state->in_buffer;
if (in_buffer->max_buffer_size && in_buffer->data) {
/* if in buffer is used, get data from in buffer. */
data_opt->data_len = gms_tcp_get_data_from_in_buffer(c_state, data_opt);
c_state->c_info.get_data_wait = false;
/* recv data from connection to inbuffer no wait. */
data_opt->report_err = false;
gms_tcp_get_data_from_connection_to_in_buffer(c_state, data_opt);
} else {
if (c_state->c_info.tx_timeout) {
int ret = gms_tcp_set_connection_rcv_timeout(c_state, c_state->c_info.tx_timeout);
if (ret == 0) {
ereport(ERROR,
(errcode(ERRCODE_TRANSFER_TIMEOUT),
errmsg("recv data timeout.")));
} else if (ret < 0) {
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
}
data_opt->data_len = recv(c_state->fd,
data_opt->data,
data_opt->len ? data_opt->len : GMS_TCP_MAX_IN_BUFFER_SIZE,
MSG_DONTWAIT | (data_opt->peek ? MSG_PEEK : 0));
if (data_opt->data_len <= 0) {
gms_tcp_close_connection_by_state(c_state);
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
/* if the data is encrypted, we must get the hold data. */
if (!data_opt->len) {
char *decrypt_data = gms_tcp_decrypt_data(c_state, data_opt->data);
if (decrypt_data) {
rc = memset_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, 0, GMS_TCP_MAX_IN_BUFFER_SIZE);
securec_check_c(rc, "\0", "\0");
rc = memcpy_s(data_opt->data, GMS_TCP_MAX_IN_BUFFER_SIZE, decrypt_data, strlen(decrypt_data));
securec_check_c(rc, "\0", "\0");
data_opt->data_len = strlen(decrypt_data);
}
}
if (data_opt->remove_crlf) {
gms_tcp_remove_newline(c_state, data_opt->data, data_opt->data_len);
}
}
}
static int32
gms_tcp_write_data(GMS_TCP_CONNECTION_STATE *c_state, GMS_TCP_CONNECTION_DATA_OPT *data_opt)
{
char *data = NULL;
int32 data_len = 0;
char *encrypt_data = NULL;
errno_t rc = EOK;
data_len = data_opt->len ? data_opt->len : strlen(data_opt->data);
if (c_state->out_buffer.data) {
bool done = false;
if (data_opt->ch_charset && c_state->c_info.charset) {
/* change charset, if data is not NULL, send data, or send data_opt->data. */
data = gms_tcp_encode_data_by_charset(c_state, data_opt, true);
if (data) {
/* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */
if (data_opt->encrypt) {
encrypt_data = gms_tcp_encrypt_data(c_state, data);
}
if (encrypt_data) {
data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data));
} else {
data_len = gms_tcp_put_data_to_out_buffer(c_state, data, strlen(data));
}
pfree(data);
done = true;
}
}
if (!done) {
/*
* the charset is not change, try encrypt the data,
* if encrypt_data is not NULL, send encrypt_data, or send data_opt->data.
*/
if (data_opt->encrypt) {
encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data);
}
if (encrypt_data) {
data_len = gms_tcp_put_data_to_out_buffer(c_state, encrypt_data, strlen(encrypt_data));
} else {
data_len = gms_tcp_put_data_to_out_buffer(c_state, data_opt->data, data_len);
}
}
} else {
char *data_t = NULL;
bool done = false;
if (data_opt->ch_charset && c_state->c_info.charset) {
/* change charset, if data is not NULL, send data, or send data_opt->data. */
char *data = gms_tcp_encode_data_by_charset(c_state, data_opt, true);
if (data) {
/* encrypt the data, if encrypt_data is not NULL, send encrypt_data, or send data. */
if (data_opt->encrypt) {
encrypt_data = gms_tcp_encrypt_data(c_state, data);
}
if (encrypt_data) {
data_t = (char *)palloc0(strlen(encrypt_data));
rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data));
securec_check_c(rc, "\0", "\0");
data_len = strlen(encrypt_data);
} else {
data_t = (char *)palloc0(strlen(data));
rc = memcpy_s(data_t, strlen(data), data, strlen(data));
securec_check_c(rc, "\0", "\0");
data_len = strlen(data);
}
pfree(data);
done = true;
}
}
if (!done) {
/*
* the charset is not change, try encrypt the data,
* if encrypt_data is not NULL, send encrypt_data, or send data_opt->data.
*/
if (data_opt->encrypt) {
encrypt_data = gms_tcp_encrypt_data(c_state, data_opt->data);
}
if (encrypt_data) {
data_t = (char *)palloc0(strlen(encrypt_data));
rc = memcpy_s(data_t, strlen(encrypt_data), encrypt_data, strlen(encrypt_data));
securec_check_c(rc, "\0", "\0");
data_len = strlen(encrypt_data);
} else {
data_t = (char *)palloc0(strlen(data_opt->data));
rc = memcpy_s(data_t, strlen(data_opt->data), data_opt->data, strlen(data_opt->data));
securec_check_c(rc, "\0", "\0");
}
}
if (c_state->c_info.tx_timeout) {
int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout);
if (ret == 0) {
ereport(ERROR,
(errcode(ERRCODE_TRANSFER_TIMEOUT),
errmsg("recv data timeout.")));
} else if (ret < 0) {
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
}
data_len = send(c_state->fd, data_t, data_len, MSG_DONTWAIT);
pfree(data_t);
}
PG_RETURN_INT32(data_len);
}
static void
gms_tcp_check_connection_null(GMS_TCP_CONNECTION_STATE *c_state)
{
if (!c_state) {
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("get connection fail.")));
} else {
Assert(c_state->state == GMS_TCP_CONNECT_OK);
}
}
Datum
gms_tcp_crlf(PG_FUNCTION_ARGS)
{
PG_RETURN_TEXT_P(cstring_to_text("\r\n"));
}
Datum
gms_tcp_available_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
int32 available_timeout = PG_GETARG_INT32(1);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_BUFFER *in_buffer = &c_state->in_buffer;
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
int32 available = 0;
gms_tcp_check_connection_null(c_state);
if (in_buffer->max_buffer_size && in_buffer->data) {
c_state->c_info.available_wait = true;
c_state->c_info.available_timeout = 0;
data_opt.report_err = false;
gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt);
available = gms_tcp_get_in_buffer_data_bytes(in_buffer);
if (!available && available_timeout) {
c_state->c_info.available_timeout = available_timeout;
gms_tcp_get_data_from_connection_to_in_buffer(c_state, &data_opt);
available = gms_tcp_get_in_buffer_data_bytes(in_buffer);
}
} else {
c_state->c_info.available_wait = true;
c_state->c_info.available_timeout = available_timeout;
available = gms_tcp_get_available_bytes(c_state);
}
c_state->c_info.available_wait = false;
c_state->c_info.available_timeout = 0;
PG_RETURN_INT32(available);
}
Datum gms_tcp_flush(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_BUFFER *out_buffer = NULL;
errno_t rc = EOK;
gms_tcp_check_connection_null(c_state);
out_buffer = &c_state->out_buffer;
if (!out_buffer->data || !out_buffer->max_buffer_size) {
PG_RETURN_VOID();
}
if (c_state->c_info.tx_timeout) {
int ret = gms_tcp_set_connection_send_timeout(c_state, c_state->c_info.tx_timeout);
if (ret == 0) {
ereport(ERROR,
(errcode(ERRCODE_TRANSFER_TIMEOUT),
errmsg("recv data timeout.")));
} else if (ret < 0) {
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("recv data error.")));
}
}
send(c_state->fd, out_buffer->data, out_buffer->end, MSG_DONTWAIT);
rc = memset_s(out_buffer->data, GMS_TCP_MAX_OUT_BUFFER_SIZE, 0, out_buffer->max_buffer_size);
securec_check_c(rc, "\0", "\0");
out_buffer->start = 0;
out_buffer->end = 0;
out_buffer->free_space = out_buffer->max_buffer_size;
PG_RETURN_VOID();
}
Datum
gms_tcp_close_all_connections(PG_FUNCTION_ARGS)
{
ListCell* lc = NULL;
if (!GMS_TCP_FD_BAK_LIST) {
PG_RETURN_VOID();
}
foreach (lc, GMS_TCP_FD_BAK_LIST) {
PortalSpecialData *node = (PortalSpecialData *)lfirst(lc);
node->cleanup(node->data);
pfree(node);
}
list_free_ext(GMS_TCP_FD_BAK_LIST);
PG_RETURN_VOID();
}
Datum
gms_tcp_close_connection(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
gms_tcp_check_connection_null(c_state);
gms_tcp_close_connection_by_state(c_state);
PG_RETURN_VOID();
}
Datum
gms_tcp_get_line_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
bool remove_crlf = PG_GETARG_BOOL(1);
bool peek = PG_GETARG_BOOL(2);
bool ch_charset = PG_GETARG_BOOL(3);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
gms_tcp_check_connection_null(c_state);
data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE);
data_opt.remove_crlf = remove_crlf;
data_opt.peek = peek;
data_opt.report_err = true;
c_state->c_info.get_data_wait = true;
gms_tcp_get_data(c_state, &data_opt);
c_state->c_info.get_data_wait = false;
if (ch_charset && c_state->c_info.charset) {
char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false);
if (data) {
pfree(data_opt.data);
PG_RETURN_TEXT_P(cstring_to_text(data));
}
}
PG_RETURN_TEXT_P(cstring_to_text(data_opt.data));
}
Datum
gms_tcp_get_raw_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
int32 len = PG_GETARG_INT32(1);
bool peek = PG_GETARG_BOOL(2);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
StringInfoData buf;
gms_tcp_check_connection_null(c_state);
data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE);
data_opt.len = len;
data_opt.peek = peek;
data_opt.report_err = true;
c_state->c_info.get_data_wait = true;
gms_tcp_get_data(c_state, &data_opt);
c_state->c_info.get_data_wait = false;
pq_begintypsend(&buf);
pq_sendbytes(&buf, data_opt.data, data_opt.data_len);
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
Datum
gms_tcp_get_text_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
int32 len = PG_GETARG_INT32(1);
bool peek = PG_GETARG_BOOL(2);
bool ch_charset = PG_GETARG_BOOL(3);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
gms_tcp_check_connection_null(c_state);
data_opt.data = (char *)palloc0(GMS_TCP_MAX_IN_BUFFER_SIZE);
data_opt.len = len;
data_opt.peek = peek;
data_opt.ch_charset = ch_charset;
data_opt.report_err = true;
c_state->c_info.get_data_wait = true;
gms_tcp_get_data(c_state, &data_opt);
c_state->c_info.get_data_wait = false;
if (ch_charset && c_state->c_info.charset) {
char *data = gms_tcp_encode_data_by_charset(c_state, &data_opt, false);
if (data) {
pfree(data_opt.data);
PG_RETURN_TEXT_P(cstring_to_text(data));
}
}
PG_RETURN_TEXT_P(cstring_to_text(data_opt.data));
}
Datum
gms_tcp_open_connection(PG_FUNCTION_ARGS)
{
text *remote_host_t = PG_GETARG_TEXT_P(0);
char *remote_host = NULL;
struct sockaddr_in saddr;
int remote_port = PG_GETARG_INT32(1);
text *local_host_t = PG_GETARG_TEXT_P(2);
char *local_host = NULL;
int local_port = PG_GETARG_INT32(3);
int in_buffer_size = PG_GETARG_INT32(4);
int out_buffer_size = PG_GETARG_INT32(5);
text *charset_t = PG_GETARG_TEXT_P(6);
char *charset = NULL;
text *newline_t = PG_GETARG_TEXT_P(7);
char *newline = NULL;
int tx_timeout = PG_GETARG_INT32(8);
GMS_TCP_CONNECTION_HEAD *c_h = NULL;
GMS_TCP_CONNECTION *c = NULL;
GMS_TCP_CONNECTION_STATE *c_state = NULL;
int offset = 0;
c = (GMS_TCP_CONNECTION *)palloc0(sizeof(GMS_TCP_CONNECTION));
c_h = &c->c_h;
if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(remote_host_t, &remote_host, GMS_TCP_MAX_HOST_LEN, &c_h->remote_host_len, &c_h->total_len)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input remote host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN)));
}
if (!remote_host) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input error remote host.")));
}
if (!gms_tcp_change_remote_host(remote_host, &saddr)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input remote host error.")));
}
if (remote_port) {
c_h->remote_port_len = sizeof(c_h->remote_port_len);
c_h->total_len += c_h->remote_port_len;
} else {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input remote port error.")));
}
if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(local_host_t, &local_host, GMS_TCP_MAX_HOST_LEN, &c_h->local_host_len, &c_h->total_len)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input local host too long, max length is %d.", GMS_TCP_MAX_HOST_LEN)));
}
if (local_port) {
c_h->local_port_len = sizeof(c_h->local_port_len);
c_h->total_len += c_h->local_port_len;
}
if (in_buffer_size > GMS_TCP_MAX_IN_BUFFER_SIZE) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_IN_BUFFER_SIZE)));
} else if (in_buffer_size) {
c_h->in_buffer_size_len = sizeof(c_h->in_buffer_size_len);
c_h->total_len += c_h->in_buffer_size_len;
}
if (out_buffer_size > GMS_TCP_MAX_OUT_BUFFER_SIZE) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("in buffer size must be limited in %d.", GMS_TCP_MAX_OUT_BUFFER_SIZE)));
} else if (out_buffer_size) {
c_h->out_buffer_size_len = sizeof(c_h->out_buffer_size_len);
c_h->total_len += c_h->out_buffer_size_len;
}
if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(charset_t, &charset, GMS_TCP_MAX_CHARSET_LEN, &c_h->charset_len, &c_h->total_len)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input charset too long, max length is %d.", GMS_TCP_MAX_CHARSET_LEN)));
}
if (charset && !gms_tcp_check_charset(charset)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("error charset: %s.", charset)));
}
if (ORAFCE_COMM_STRING_TOO_LONG == gms_tcp_comm_get_str(newline_t, &newline, 16, &c_h->newline_len, &c_h->total_len)) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("input newline too long, max length is %d.", GMS_TCP_MAX_NEWLINE_LEN)));
}
if (tx_timeout > GMS_TCP_MAX_TX_TIMEOUT || tx_timeout < 0) {
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_BAD_ARGUMENT),
errmsg("tx timeout must be limited in %d ~ %d.", 0, GMS_TCP_MAX_TX_TIMEOUT)));
} else if (tx_timeout) {
c_h->tx_timeout_len = sizeof(c_h->tx_timeout_len);
c_h->total_len += c_h->tx_timeout_len;
}
Assert(c_h->total_len);
gms_tcp_comm_memcopy_str(c->data + offset, c_h->remote_host_len, remote_host, &offset);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->remote_port_len, (char *)&remote_port, c_h->remote_port_len, &offset);
gms_tcp_comm_memcopy_str(c->data + offset, c_h->local_host_len, local_host, &offset);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->local_port_len, (char *)&local_port, c_h->local_port_len, &offset);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->in_buffer_size_len, (char *)&in_buffer_size, c_h->in_buffer_size_len, &offset);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->out_buffer_size_len, (char *)&out_buffer_size, c_h->out_buffer_size_len, &offset);
gms_tcp_comm_memcopy_str(c->data + offset, c_h->charset_len, charset, &offset);
gms_tcp_comm_memcopy_str(c->data + offset, c_h->newline_len, newline, &offset);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->tx_timeout_len, (char *)&tx_timeout, c_h->tx_timeout_len, &offset);
c_state = (GMS_TCP_CONNECTION_STATE *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_STATE));
gms_tcp_get_connection_info(c, &c_state->c_info);
c_state->c_info.saddr = saddr;
gms_tcp_init_data_buffer(&c_state->in_buffer, in_buffer_size);
gms_tcp_init_data_buffer(&c_state->out_buffer, out_buffer_size);
if (gms_tcp_connect(c_state) != GMS_TCP_OK) {
std::shared_ptr<void> data_cstate = std::shared_ptr<void>(c_state);
gms_tcp_release_connection_state(data_cstate);
pfree(c);
ereport(ERROR,
(errcode(ERRCODE_NETWORK_ERROR),
errmsg("connect to remote(remote host: %s, remote port: %d) fail.",
remote_host, remote_port)));
}
c_h->fd_len = sizeof(int);
gms_tcp_comm_memcopy_int(c->data + offset, c_h->fd_len, (char *)&c_state->fd, c_h->fd_len, &offset);
c_h->total_len += c_h->fd_len;
c_state->c_info.fd = c_state->fd;
gms_tcp_store_connection(c_state);
PG_RETURN_POINTER(c);
}
Datum
gms_tcp_write_line(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
text *data_t = PG_GETARG_TEXT_P(1);
char *data = text_to_cstring(data_t);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
char *data_out = NULL;
char *newline = NULL;
int newline_len = 0;
int32 len = 0;
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
errno_t rc = EOK;
gms_tcp_check_connection_null(c_state);
data_out = (char *)palloc0(GMS_TCP_MAX_OUT_BUFFER_SIZE);
Assert(c_state->c_info.newline);
if (strcmp(c_state->c_info.newline, "CRLF") == 0) {
newline_len = 2;
newline = "\r\n";
} else {
newline_len = 1;
newline = "\n";
}
rc = memcpy_s(data_out, GMS_TCP_MAX_OUT_BUFFER_SIZE, data, strlen(data));
securec_check_c(rc, "\0", "\0");
data_opt.data = data_out;
data_opt.data_len = strlen(data_out);
rc = memcpy_s(&data_out[data_opt.data_len], GMS_TCP_MAX_OUT_BUFFER_SIZE - data_opt.data_len, newline, newline_len);
securec_check_c(rc, "\0", "\0");
data_opt.data_len += newline_len;
data_opt.ch_charset = true;
data_opt.encrypt = true;
len = gms_tcp_write_data(c_state, &data_opt);
PG_RETURN_INT32(len);
}
Datum
gms_tcp_write_raw_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
bytea *wbuf = PG_GETARG_BYTEA_P(1);
int32 write_len = PG_GETARG_INT32(2);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
int32 len = 0;
gms_tcp_check_connection_null(c_state);
data_opt.data = VARDATA(wbuf);
data_opt.data_len = VARSIZE(wbuf) - VARHDRSZ;
data_opt.len = write_len;
len = gms_tcp_write_data(c_state, &data_opt);
PG_RETURN_INT32(len);
}
Datum
gms_tcp_write_text_real(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
text *data_t = PG_GETARG_TEXT_P(1);
int32 write_len = PG_GETARG_INT32(2);
char *data = text_to_cstring(data_t);
GMS_TCP_CONNECTION_STATE *c_state = gms_tcp_get_connection_state(c);
int32 len = 0;
GMS_TCP_CONNECTION_DATA_OPT data_opt = {0};
gms_tcp_check_connection_null(c_state);
data_opt.data = data;
data_opt.data_len = strlen(data);
data_opt.len = write_len;
data_opt.ch_charset = true;
data_opt.encrypt = false;
len = gms_tcp_write_data(c_state, &data_opt);
PG_RETURN_INT32(len);
}
Datum
gms_tcp_connection_in(PG_FUNCTION_ARGS)
{
PG_RETURN_NULL();
}
Datum
gms_tcp_connection_out(PG_FUNCTION_ARGS)
{
GMS_TCP_CONNECTION *c = (GMS_TCP_CONNECTION *)PG_GETARG_POINTER(0);
GMS_TCP_CONNECTION_INFO *c_info = NULL;
StringInfoData str;
if(!c) {
PG_RETURN_NULL();
}
c_info = (GMS_TCP_CONNECTION_INFO *)gms_tcp_alloc(sizeof(GMS_TCP_CONNECTION_INFO));
gms_tcp_get_connection_info(c, c_info);
initStringInfo(&str);
if (c_info->remote_host) {
appendStringInfo(&str, "remote host:");
appendStringInfo(&str, "%s", c_info->remote_host);
}
if (c_info->remote_port) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "remote port:");
appendStringInfo(&str, "%d", c_info->remote_port);
}
if (c_info->local_host) {
appendStringInfo(&str, "local host:");
appendStringInfo(&str, "%s", c_info->local_host);
}
if (c_info->local_port) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "local port:");
appendStringInfo(&str, "%d", c_info->local_port);
}
if (c_info->in_buffer_size) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "in buffer size:");
appendStringInfo(&str, "%d", c_info->in_buffer_size);
}
if (c_info->out_buffer_size) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "out buffer size:");
appendStringInfo(&str, "%d", c_info->out_buffer_size);
}
if (c_info->charset) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "charset:");
appendStringInfo(&str, "%s", c_info->charset);
}
if (c_info->newline && (strcmp(c_info->newline, "CRLF") != 0)) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "newline:");
appendStringInfo(&str, "%s", c_info->newline);
}
if (c_info->tx_timeout && (c_info->tx_timeout != GMS_TCP_MAX_TX_TIMEOUT)) {
appendStringInfoChar(&str, ',');
appendStringInfo(&str, "tx timeout:");
appendStringInfo(&str, "%d", c_info->tx_timeout);
}
gms_tcp_release_connection_info(c_info);
pfree(c_info);
PG_RETURN_CSTRING(str.data);
}
static void gms_tcp_comm_memcopy_str(char *dst, int dst_len, char *src, int *offset)
{
errno_t rc = EOK;
if (src) {
int len = 0;
int src_len = strlen(src);
rc = memcpy_s(dst, dst_len, src, src_len);
securec_check_c(rc, "\0", "\0");
len += src_len;
dst[src_len] = '\0';
len++;
if (offset) {
*offset += len;
}
}
}
static void gms_tcp_comm_memcopy_int(char *dst, int dst_len, char *src, int src_len, int *offset)
{
errno_t rc = EOK;
if (src_len) {
rc = memcpy_s(dst, dst_len, src, src_len);
securec_check_c(rc, "\0", "\0");
if (offset) {
*offset += src_len;
}
}
}
static int gms_tcp_comm_get_str(text *data_t, char **data, int data_len_max, int *get_data_len, int *total_len)
{
if (data_t) {
char *p = text_to_cstring(data_t);
int data_len = strlen(p) + 1;
if (data_len >= data_len_max) {
return ORAFCE_COMM_STRING_TOO_LONG;
}
if (!(strlen(p) == 1 && p[0] == '0')) {
*get_data_len = data_len;
*total_len += data_len;
*data = p;
}
}
return 0;
}
static bool gms_tcp_comm_cmp_str(const char *str_1, const char *str_2)
{
if ((str_1 && str_2) && (strlen(str_1) == strlen(str_2)) && (memcmp(str_1, str_2, strlen(str_1)) == 0)) {
return true;
} else if (!str_1 && !str_2) {
return true;
} else {
return false;
}
}