修改资源池化场景下gs_probackup工具中DSS读写步长,优化性能

This commit is contained in:
liuzhanfeng2
2023-09-19 09:20:27 +08:00
parent 4a6307f928
commit b3fc0eec8e
3 changed files with 208 additions and 27 deletions

View File

@ -31,6 +31,12 @@
#include "zstd.h"
#include "storage/file/fio_device.h"
typedef struct PreReadBuf
{
int num;
char *data;
} PreReadBuf;
/* Union to ease operations on relation pages */
typedef struct DataPage
{
@ -466,7 +472,7 @@ prepare_page(ConnectionArgs *conn_arg,
Page page, bool strict,
uint32 checksum_version,
const char *from_fullpath,
PageState *page_st, PageCompression *pageCompression, int &read_len)
PageState *page_st, PageCompression *pageCompression, int &read_len, PreReadBuf *preReadBuf)
{
int try_again = PAGE_READ_ATTEMPTS;
bool page_is_valid = false;
@ -475,7 +481,10 @@ prepare_page(ConnectionArgs *conn_arg,
/* check for interrupt */
if (is_interrupt)
{
pg_free(preReadBuf->data);
elog(ERROR, "Interrupted during page reading");
}
/*
* Read the page and verify its header and checksum.
@ -486,7 +495,54 @@ prepare_page(ConnectionArgs *conn_arg,
while (!page_is_valid && try_again--)
{
/* read the block */
read_len = fio_pread(in, page, blknum * BLCKSZ, pageCompression);
int offset = blknum * BLCKSZ;
int fileStartOff = offset - (offset % DSS_BLCKSZ);
if (IsDssMode() && file->size - fileStartOff >= DSS_BLCKSZ)
{
int preReadOff = offset % DSS_BLCKSZ;
if (offset / DSS_BLCKSZ == preReadBuf->num)
{
rc = memcpy_s(page, BLCKSZ, preReadBuf->data + preReadOff, BLCKSZ);
securec_check(rc, "\0", "\0");
read_len = BLCKSZ;
}
else
{
read_len = fio_pread(in, preReadBuf->data, fileStartOff, pageCompression, DSS_BLCKSZ);
preReadBuf->num = offset / DSS_BLCKSZ;
if (read_len == 0)
{
elog(VERBOSE, "Cannot read block %u of \"%s\": "
"block truncated", offset / DSS_BLCKSZ, from_fullpath);
}
else if (read_len < 0)
{
pg_free(preReadBuf->data);
elog(ERROR, "Cannot read block %u of \"%s\": %s",
offset / DSS_BLCKSZ, from_fullpath, strerror(errno));
}
else if (read_len != DSS_BLCKSZ)
{
if (read_len > (int)MIN_COMPRESS_ERROR_RT)
{
pg_free(preReadBuf->data);
elog(ERROR, "Cannot read block %u of \"%s\" code: %lu : %s", offset / DSS_BLCKSZ, from_fullpath, read_len,
strerror(errno));
}
elog(WARNING,
"Cannot read block %u of \"%s\": "
"read %i of %d, try again",
offset / DSS_BLCKSZ, from_fullpath, read_len, DSS_BLCKSZ);
}
rc = memcpy_s(page, BLCKSZ, preReadBuf->data + preReadOff, BLCKSZ);
securec_check(rc, "\0", "\0");
read_len = BLCKSZ;
}
}
else
{
read_len = fio_pread(in, page, blknum * BLCKSZ, pageCompression, BLCKSZ);
}
/* The block could have been truncated. It is fine. */
if (read_len == 0)
@ -496,10 +552,14 @@ prepare_page(ConnectionArgs *conn_arg,
return PageIsTruncated;
}
else if (read_len < 0)
{
pg_free(preReadBuf->data);
elog(ERROR, "Cannot read block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno));
}
else if (read_len != BLCKSZ) {
if (read_len > (int)MIN_COMPRESS_ERROR_RT) {
pg_free(preReadBuf->data);
elog(ERROR, "Cannot read block %u of \"%s\" code: %lu : %s", blknum, from_fullpath, read_len,
strerror(errno));
}
@ -573,11 +633,19 @@ prepare_page(ConnectionArgs *conn_arg,
elevel = WARNING;
if (errormsg)
{
if (elevel == ERROR)
pg_free(preReadBuf->data);
elog(elevel, "Corruption detected in file \"%s\", block %u: %s",
from_fullpath, blknum, errormsg);
}
else
{
if (elevel == ERROR)
pg_free(preReadBuf->data);
elog(elevel, "Corruption detected in file \"%s\", block %u",
from_fullpath, blknum);
}
pg_free(errormsg);
return PageIsCorrupted;
@ -1007,6 +1075,13 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
char *preWriteBuf = (char*)malloc(DSS_BLCKSZ);
if (preWriteBuf == NULL)
elog(ERROR, "malloc preWriteBuf failed, size : %d", DSS_BLCKSZ);
int preWriteOff = 0;
int targetSize = file->write_size;
int *p_preWriteOff = &preWriteOff;
int *p_targetSize = &targetSize;
for (;;)
{
off_t write_pos;
@ -1022,7 +1097,10 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
/* check for interrupt */
if (interrupted || thread_interrupted)
{
pg_free(preWriteBuf);
elog(ERROR, "Interrupted during data file restore");
}
/* newer backups have headers in separate storage */
if (headers)
@ -1099,14 +1177,23 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
/* To correctly truncate file, we must first flush STDIO buffers */
if (fio_fflush(out) != 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot flush file \"%s\": %s", to_fullpath, strerror(errno));
}
/* Set position to the start of file */
if (fio_fseek(out, 0) < 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot seek to the start of file \"%s\": %s", to_fullpath, strerror(errno));
}
if (fio_ftruncate(out, blknum * BLCKSZ) != 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot truncate file \"%s\": %s", to_fullpath, strerror(errno));
}
break;
}
@ -1119,7 +1206,10 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
break;
if (compressed_size > BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Size of a blknum %i exceed BLCKSZ: %i", blknum, compressed_size);
}
/* Incremental restore in LSN mode */
if (map && lsn_map && datapagemap_is_set(lsn_map, blknum))
@ -1147,8 +1237,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
* go to the next page.
*/
if (!headers && fseek(in, read_len, SEEK_CUR) != 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, from_fullpath, strerror(errno));
}
continue;
}
@ -1156,8 +1249,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
cur_pos_in != headers[n_hdr].pos)
{
if (fseek(in, headers[n_hdr].pos, SEEK_SET) != 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot seek to offset %u of \"%s\": %s",
headers[n_hdr].pos, from_fullpath, strerror(errno));
}
cur_pos_in = headers[n_hdr].pos;
}
@ -1169,8 +1265,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
len = fread(page.data, 1, read_len, in);
if (len != read_len)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot read block %u file \"%s\": %s",
blknum, from_fullpath, strerror(errno));
}
cur_pos_in += read_len;
@ -1197,8 +1296,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
if (cur_pos_out != write_pos)
{
if (fio_fseek(out, write_pos) < 0)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
cur_pos_out = write_pos;
}
@ -1206,20 +1308,62 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
/* If page is compressed and restore is in remote mode, send compressed
* page to the remote side.
*/
if (is_compressed)
if (IsDssMode() && targetSize >= DSS_BLCKSZ)
{
ssize_t rc;
rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg);
if (is_compressed)
{
ssize_t rc;
rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, to_fullpath, preWriteBuf, p_preWriteOff, p_targetSize);
if (!fio_is_remote_file(out) && rc != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u",
blknum, to_fullpath, strerror(errno), compressed_size);
if (!fio_is_remote_file(out) && rc != BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot write block %u of preWriteBuf: %s, size: %u",
blknum, strerror(errno), compressed_size);
}
}
else
{
int ret = memcpy_s(preWriteBuf + preWriteOff, DSS_BLCKSZ, page.data, BLCKSZ);
securec_check(ret, "\0", "\0");
preWriteOff += BLCKSZ;
if (preWriteOff == DSS_BLCKSZ)
{
if (fio_fwrite(out, preWriteBuf, DSS_BLCKSZ) != DSS_BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
preWriteOff = 0;
targetSize = file->write_size - blknum * BLCKSZ;
}
}
}
else
{
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
if (is_compressed)
{
ssize_t rc;
rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, NULL, NULL, NULL, NULL);
if (!fio_is_remote_file(out) && rc != BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u",
blknum, to_fullpath, strerror(errno), compressed_size);
}
}
else
{
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot write block %u of \"%s\": %s",
blknum, to_fullpath, strerror(errno));
}
}
}
write_len += BLCKSZ;
@ -1229,8 +1373,8 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
if (map)
datapagemap_add(map, blknum);
}
pg_free(preWriteBuf);
return write_len;
}
@ -1688,6 +1832,11 @@ check_data_file(ConnectionArgs *arguments, pgFile *file,
*/
nblocks = file->size/BLCKSZ;
PreReadBuf preReadBuf;
preReadBuf.num = -1;
preReadBuf.data = (char*)malloc(DSS_BLCKSZ);
if (preReadBuf.data == NULL)
elog(ERROR, "malloc preReadBuf.data failed, size : %d", DSS_BLCKSZ);
for (blknum = 0; blknum < nblocks; blknum++)
{
PageState page_st;
@ -1695,7 +1844,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file,
page_state = prepare_page(NULL, file, InvalidXLogRecPtr,
blknum, in, BACKUP_MODE_FULL,
curr_page, false, checksum_version,
from_fullpath, &page_st, NULL, read_len);
from_fullpath, &page_st, NULL, read_len, &preReadBuf);
if (page_state == PageIsTruncated)
break;
@ -1710,6 +1859,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file,
}
}
pg_free(preReadBuf.data);
fclose(in);
return is_valid;
}
@ -2222,6 +2372,11 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
}
PreReadBuf preReadBuf;
preReadBuf.num = -1;
preReadBuf.data = (char*)malloc(DSS_BLCKSZ);
if (preReadBuf.data == NULL)
elog(ERROR, "malloc preReadBuf.data failed, size : %d", DSS_BLCKSZ);
while (blknum < (BlockNumber)file->n_blocks)
{
PageState page_st;
@ -2229,7 +2384,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
int rc = prepare_page(conn_arg, file, prev_backup_start_lsn,
blknum, in, backup_mode, curr_page,
true, checksum_version,
from_fullpath, &page_st, pageCompression, read_len);
from_fullpath, &page_st, pageCompression, read_len, &preReadBuf);
if (rc == PageIsTruncated)
break;
@ -2277,6 +2432,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
else
blknum++;
}
pg_free(preReadBuf.data);
/*
* Add dummy header, so we can later extract the length of last header

View File

@ -633,7 +633,7 @@ int fio_truncate(int fd, off_t size)
/*
* Read file from specified location.
*/
int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression)
int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression, int size)
{
if (fio_is_remote_file(f))
{
@ -660,13 +660,13 @@ int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression)
{
/* For local file, opened by fopen, we should use stdio functions */
if (pageCompression) {
return (int)pageCompression->ReadCompressedBuffer((BlockNumber)(offs / BLCKSZ), (char*)buf, BLCKSZ, true);
return (int)pageCompression->ReadCompressedBuffer((BlockNumber)(offs / BLCKSZ), (char*)buf, size, true);
} else {
int rc = fseek(f, offs, SEEK_SET);
if (rc < 0) {
return rc;
}
return fread(buf, 1, BLCKSZ, f);
return fread(buf, 1, size, f);
}
}
}
@ -765,7 +765,7 @@ fio_decompress(void* dst, void const* src, size_t size, int compress_alg)
}
/* Write data to the file */
ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg)
ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char *to_fullpath, char *preWriteBuf, int *preWriteOff, int *targetSize)
{
if (fio_is_remote_file(f))
{
@ -783,13 +783,37 @@ ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compres
}
else
{
/* operate is same in local mode and dss mode */
char uncompressed_buf[BLCKSZ];
int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
return (uncompressed_size < 0)
? uncompressed_size
: fio_fwrite(f, uncompressed_buf, uncompressed_size);
if (preWriteBuf != NULL)
{
int32 uncompressed_size = fio_decompress(preWriteBuf + (*preWriteOff), buf, size, compress_alg);
*preWriteOff += uncompressed_size;
if (*preWriteOff > DSS_BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Offset %d is bigger than preWriteBuf size %d", *preWriteOff, DSS_BLCKSZ);
}
if (*preWriteOff == DSS_BLCKSZ)
{
int write_len = fio_fwrite(f, preWriteBuf, DSS_BLCKSZ);
if (write_len != DSS_BLCKSZ)
{
pg_free(preWriteBuf);
elog(ERROR, "Cannot write block of \"%s\": %s, size: %u",
to_fullpath, strerror(errno), DSS_BLCKSZ);
}
*preWriteOff = 0;
*targetSize -= DSS_BLCKSZ;
}
return uncompressed_size;
}
else
{
char uncompressed_buf[BLCKSZ];
int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
return (uncompressed_size < 0)
? uncompressed_size
: fio_fwrite(f, uncompressed_buf, uncompressed_size);
}
}
}

View File

@ -85,6 +85,7 @@ typedef enum
#define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (int)(size)) fio_error(_rc, size, __FILE__, __LINE__); } while (0)
#define FILE_PERMISSIONS 0600
#define DSS_BLCKSZ 2097152 //2M , In dss mode, the size of operation(read and write) from a DSS file is 2M
typedef struct
{
@ -112,9 +113,9 @@ extern int fio_get_agent_version(void);
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
extern void fio_construct_compressed(void const* buf, size_t size);
extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg);
extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char *to_fullpath, char *preWriteBuf, int *preWriteOff, int *targetSize);
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
extern int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression);
extern int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression, int size);
extern int fio_fprintf(FILE* f, char const* arg, ...);// pg_attribute_printf(2, 3);
extern int fio_fflush(FILE* f);
extern int fio_fseek(FILE* f, off_t offs);