!6639 libpq写入避免大型拷贝

Merge pull request !6639 from 王平云/master
This commit is contained in:
opengauss_bot
2024-11-11 08:46:04 +00:00
committed by Gitee
2 changed files with 46 additions and 18 deletions

View File

@ -137,6 +137,7 @@ inline int internal_putbytes(const char* s, size_t len);
/* Internal functions */
int internal_flush(void);
static pg_noinline int internal_flush_buffer(const char *buf, size_t *start, size_t *end);
void pq_set_nonblocking(bool nonblocking);
void pq_disk_generate_checking_header(const char *src_data, StringInfo dest_data, uint32 data_len, uint32 seq_num);
static size_t pq_disk_read_data_block(
@ -1572,8 +1573,6 @@ int pq_putbytes(const char* s, size_t len)
int internal_putbytes(const char* s, size_t len)
{
size_t amount;
while (len > 0) {
/* If buffer is full, then flush it out */
if (t_thrd.libpq_cxt.PqSendPointer >= t_thrd.libpq_cxt.PqSendBufferSize) {
@ -1596,15 +1595,28 @@ int internal_putbytes(const char* s, size_t len)
}
}
}
amount = t_thrd.libpq_cxt.PqSendBufferSize - t_thrd.libpq_cxt.PqSendPointer;
if (amount > len) {
amount = len;
/*
* If the buffer is empty and data length is larger than the buffer
* size, send it without buffering. Otherwise, copy as musch data as
* possible into the buffer
*/
if (len >= t_thrd.libpq_cxt.PqSendBufferSize && t_thrd.libpq_cxt.PqSendPointer == t_thrd.libpq_cxt.PqSendStart) {
size_t start = 0;
pq_set_nonblocking(false);
if (internal_flush_buffer(s, &start, &len)) {
return EOF;
}
} else {
size_t amount = t_thrd.libpq_cxt.PqSendBufferSize - t_thrd.libpq_cxt.PqSendPointer;
if (amount > len) {
amount = len;
}
errno_t rc = memcpy_s(t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer, amount, s, amount);
securec_check(rc, "\0", "\0");
t_thrd.libpq_cxt.PqSendPointer += amount;
s += amount;
len -= amount;
}
errno_t rc = memcpy_s(t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer, amount, s, amount);
securec_check(rc, "\0", "\0");
t_thrd.libpq_cxt.PqSendPointer += amount;
s += amount;
len -= amount;
}
return 0;
@ -1693,6 +1705,22 @@ static void SetConnectionLostFlag()
* --------------------------------
*/
int internal_flush(void)
{
return internal_flush_buffer(
t_thrd.libpq_cxt.PqSendBuffer,
&t_thrd.libpq_cxt.PqSendStart,
&t_thrd.libpq_cxt.PqSendPointer
);
}
/* --------------------------------
* internal_flush_buffer - flush the given buff content
*
* Returns 0 if OK (meaning everything was sent, or operation would block
* and the socket is in non-blocking mode), or EOF if trouble.
* --------------------------------
*/
int internal_flush_buffer(const char *buf, size_t *start, size_t *end)
{
if ((t_thrd.walsender_cxt.ep_fd != -1 && g_comm_proxy_config.s_send_xlog_mode == CommSendXlogWaitIn)) {
return libnet_flush();
@ -1701,8 +1729,8 @@ int internal_flush(void)
static THR_LOCAL int last_reported_send_errno = 0;
errno_t ret;
char* bufptr = t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendStart;
char* bufend = t_thrd.libpq_cxt.PqSendBuffer + t_thrd.libpq_cxt.PqSendPointer;
const char* bufptr = buf + *start;
const char* bufend = buf + *end;
char connTimeInfoStr[INITIAL_EXPBUFFER_SIZE] = {'\0'};
WaitState oldStatus = pgstat_report_waitstatus(STATE_WAIT_UNDEFINED, true);
@ -1734,7 +1762,7 @@ int internal_flush(void)
while (bufptr < bufend) {
int r;
r = secure_write(u_sess->proc_cxt.MyProcPort, bufptr, bufend - bufptr);
r = secure_write(u_sess->proc_cxt.MyProcPort, (char *)bufptr, bufend - bufptr);
if (unlikely(r == 0 && (StreamThreadAmI() == true || u_sess->proc_cxt.MyProcPort->is_logic_conn))) {
/* Stop query when cancel happend */
if (t_thrd.int_cxt.QueryCancelPending) {
@ -1790,7 +1818,7 @@ int internal_flush(void)
* flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate
* the connection.
*/
t_thrd.libpq_cxt.PqSendStart = t_thrd.libpq_cxt.PqSendPointer = 0;
*start = *end = 0;
SetConnectionLostFlag();
(void)pgstat_report_waitstatus(oldStatus);
return EOF;
@ -1798,10 +1826,10 @@ int internal_flush(void)
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
t_thrd.libpq_cxt.PqSendStart += r;
*start += r;
}
t_thrd.libpq_cxt.PqSendStart = t_thrd.libpq_cxt.PqSendPointer = 0;
*start = *end = 0;
(void)pgstat_report_waitstatus(oldStatus);
return 0;
}

View File

@ -2168,9 +2168,9 @@ typedef struct knl_t_libpq_context {
/* Size send buffer */
int PqSendBufferSize;
/* Next index to store a byte in PqSendBuffer */
int PqSendPointer;
size_t PqSendPointer;
/* Next index to send a byte in PqSendBuffer */
int PqSendStart;
size_t PqSendStart;
char* PqRecvBuffer;
/* Size recv buffer */
int PqRecvBufferSize;