diff --git a/src/common/backend/libpq/pqcomm.cpp b/src/common/backend/libpq/pqcomm.cpp index 896ba48b0..67febbedc 100644 --- a/src/common/backend/libpq/pqcomm.cpp +++ b/src/common/backend/libpq/pqcomm.cpp @@ -137,6 +137,7 @@ inline int internal_putbytes(const char* s, size_t len); /* Internal functions */ int internal_flush(void); +static pg_noinline int internal_flush_buffer(const char *buf, size_t *start, size_t *end); void pq_set_nonblocking(bool nonblocking); void pq_disk_generate_checking_header(const char *src_data, StringInfo dest_data, uint32 data_len, uint32 seq_num); static size_t pq_disk_read_data_block( @@ -1572,8 +1573,6 @@ int pq_putbytes(const char* s, size_t len) int internal_putbytes(const char* s, size_t len) { - size_t amount; - while (len > 0) { /* If buffer is full, then flush it out */ if (t_thrd.libpq_cxt.PqSendPointer >= t_thrd.libpq_cxt.PqSendBufferSize) { @@ -1596,15 +1595,28 @@ int internal_putbytes(const char* s, size_t len) } } } - amount = t_thrd.libpq_cxt.PqSendBufferSize - t_thrd.libpq_cxt.PqSendPointer; - if (amount > len) { - amount = len; + /* + * If the buffer is empty and data length is larger than the buffer + * size, send it without buffering. Otherwise, copy as musch data as + * possible into the buffer + */ + if (len >= t_thrd.libpq_cxt.PqSendBufferSize && t_thrd.libpq_cxt.PqSendPointer == t_thrd.libpq_cxt.PqSendStart) { + size_t start = 0; + pq_set_nonblocking(false); + if (internal_flush_buffer(s, &start, &len)) { + return EOF; + } + } else { + size_t amount = t_thrd.libpq_cxt.PqSendBufferSize - t_thrd.libpq_cxt.PqSendPointer; + if (amount > len) { + amount = len; + } + errno_t rc = memcpy_s(t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer, amount, s, amount); + securec_check(rc, "\0", "\0"); + t_thrd.libpq_cxt.PqSendPointer += amount; + s += amount; + len -= amount; } - errno_t rc = memcpy_s(t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer, amount, s, amount); - securec_check(rc, "\0", "\0"); - t_thrd.libpq_cxt.PqSendPointer += amount; - s += amount; - len -= amount; } return 0; @@ -1693,6 +1705,22 @@ static void SetConnectionLostFlag() * -------------------------------- */ int internal_flush(void) +{ + return internal_flush_buffer( + t_thrd.libpq_cxt.PqSendBuffer, + &t_thrd.libpq_cxt.PqSendStart, + &t_thrd.libpq_cxt.PqSendPointer + ); +} + +/* -------------------------------- + * internal_flush_buffer - flush the given buff content + * + * Returns 0 if OK (meaning everything was sent, or operation would block + * and the socket is in non-blocking mode), or EOF if trouble. + * -------------------------------- + */ +int internal_flush_buffer(const char *buf, size_t *start, size_t *end) { if ((t_thrd.walsender_cxt.ep_fd != -1 && g_comm_proxy_config.s_send_xlog_mode == CommSendXlogWaitIn)) { return libnet_flush(); @@ -1701,8 +1729,8 @@ int internal_flush(void) static THR_LOCAL int last_reported_send_errno = 0; errno_t ret; - char* bufptr = t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendStart; - char* bufend = t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer; + const char* bufptr = buf + *start; + const char* bufend = buf + *end; char connTimeInfoStr[INITIAL_EXPBUFFER_SIZE] = {'\0'}; WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_UNDEFINED, true); @@ -1734,7 +1762,7 @@ int internal_flush(void) while (bufptr < bufend) { int r; - r = secure_write(u_sess->proc_cxt.MyProcPort, bufptr, bufend - bufptr); + r = secure_write(u_sess->proc_cxt.MyProcPort, (char *)bufptr, bufend - bufptr); if (unlikely(r == 0 && (StreamThreadAmI() == true || u_sess->proc_cxt.MyProcPort->is_logic_conn))) { /* Stop query when cancel happend */ if (t_thrd.int_cxt.QueryCancelPending) { @@ -1790,7 +1818,7 @@ int internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - t_thrd.libpq_cxt.PqSendStart = t_thrd.libpq_cxt.PqSendPointer = 0; + *start = *end = 0; SetConnectionLostFlag(); (void)pgstat_report_waitstatus(oldStatus); return EOF; @@ -1798,10 +1826,10 @@ int internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - t_thrd.libpq_cxt.PqSendStart += r; + *start += r; } - t_thrd.libpq_cxt.PqSendStart = t_thrd.libpq_cxt.PqSendPointer = 0; + *start = *end = 0; (void)pgstat_report_waitstatus(oldStatus); return 0; } diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 3a72f24fd..51581eda6 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2168,9 +2168,9 @@ typedef struct knl_t_libpq_context { /* Size send buffer */ int PqSendBufferSize; /* Next index to store a byte in PqSendBuffer */ - int PqSendPointer; + size_t PqSendPointer; /* Next index to send a byte in PqSendBuffer */ - int PqSendStart; + size_t PqSendStart; char* PqRecvBuffer; /* Size recv buffer */ int PqRecvBufferSize;