diff --git a/src/bin/pg_probackup/data.cpp b/src/bin/pg_probackup/data.cpp index 52e71a19c..bd1e6a970 100644 --- a/src/bin/pg_probackup/data.cpp +++ b/src/bin/pg_probackup/data.cpp @@ -31,6 +31,12 @@ #include "zstd.h" #include "storage/file/fio_device.h" +typedef struct PreReadBuf +{ + int num; + char *data; +} PreReadBuf; + /* Union to ease operations on relation pages */ typedef struct DataPage { @@ -466,7 +472,7 @@ prepare_page(ConnectionArgs *conn_arg, Page page, bool strict, uint32 checksum_version, const char *from_fullpath, - PageState *page_st, PageCompression *pageCompression, int &read_len) + PageState *page_st, PageCompression *pageCompression, int &read_len, PreReadBuf *preReadBuf) { int try_again = PAGE_READ_ATTEMPTS; bool page_is_valid = false; @@ -475,7 +481,10 @@ prepare_page(ConnectionArgs *conn_arg, /* check for interrupt */ if (is_interrupt) + { + pg_free(preReadBuf->data); elog(ERROR, "Interrupted during page reading"); + } /* * Read the page and verify its header and checksum. @@ -486,7 +495,54 @@ prepare_page(ConnectionArgs *conn_arg, while (!page_is_valid && try_again--) { /* read the block */ - read_len = fio_pread(in, page, blknum * BLCKSZ, pageCompression); + int offset = blknum * BLCKSZ; + int fileStartOff = offset - (offset % DSS_BLCKSZ); + if (IsDssMode() && file->size - fileStartOff >= DSS_BLCKSZ) + { + int preReadOff = offset % DSS_BLCKSZ; + if (offset / DSS_BLCKSZ == preReadBuf->num) + { + rc = memcpy_s(page, BLCKSZ, preReadBuf->data + preReadOff, BLCKSZ); + securec_check(rc, "\0", "\0"); + read_len = BLCKSZ; + } + else + { + read_len = fio_pread(in, preReadBuf->data, fileStartOff, pageCompression, DSS_BLCKSZ); + preReadBuf->num = offset / DSS_BLCKSZ; + if (read_len == 0) + { + elog(VERBOSE, "Cannot read block %u of \"%s\": " + "block truncated", offset / DSS_BLCKSZ, from_fullpath); + } + else if (read_len < 0) + { + pg_free(preReadBuf->data); + elog(ERROR, "Cannot read block %u of \"%s\": %s", + offset / DSS_BLCKSZ, from_fullpath, strerror(errno)); + } + else if (read_len != DSS_BLCKSZ) + { + if (read_len > (int)MIN_COMPRESS_ERROR_RT) + { + pg_free(preReadBuf->data); + elog(ERROR, "Cannot read block %u of \"%s\" code: %lu : %s", offset / DSS_BLCKSZ, from_fullpath, read_len, + strerror(errno)); + } + elog(WARNING, + "Cannot read block %u of \"%s\": " + "read %i of %d, try again", + offset / DSS_BLCKSZ, from_fullpath, read_len, DSS_BLCKSZ); + } + rc = memcpy_s(page, BLCKSZ, preReadBuf->data + preReadOff, BLCKSZ); + securec_check(rc, "\0", "\0"); + read_len = BLCKSZ; + } + } + else + { + read_len = fio_pread(in, page, blknum * BLCKSZ, pageCompression, BLCKSZ); + } /* The block could have been truncated. It is fine. */ if (read_len == 0) @@ -496,10 +552,14 @@ prepare_page(ConnectionArgs *conn_arg, return PageIsTruncated; } else if (read_len < 0) + { + pg_free(preReadBuf->data); elog(ERROR, "Cannot read block %u of \"%s\": %s", blknum, from_fullpath, strerror(errno)); + } else if (read_len != BLCKSZ) { if (read_len > (int)MIN_COMPRESS_ERROR_RT) { + pg_free(preReadBuf->data); elog(ERROR, "Cannot read block %u of \"%s\" code: %lu : %s", blknum, from_fullpath, read_len, strerror(errno)); } @@ -573,11 +633,19 @@ prepare_page(ConnectionArgs *conn_arg, elevel = WARNING; if (errormsg) + { + if (elevel == ERROR) + pg_free(preReadBuf->data); elog(elevel, "Corruption detected in file \"%s\", block %u: %s", from_fullpath, blknum, errormsg); + } else + { + if (elevel == ERROR) + pg_free(preReadBuf->data); elog(elevel, "Corruption detected in file \"%s\", block %u", from_fullpath, blknum); + } pg_free(errormsg); return PageIsCorrupted; @@ -1007,6 +1075,13 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers elog(ERROR, "Cannot seek block %u of \"%s\": %s", blknum, to_fullpath, strerror(errno)); + char *preWriteBuf = (char*)malloc(DSS_BLCKSZ); + if (preWriteBuf == NULL) + elog(ERROR, "malloc preWriteBuf failed, size : %d", DSS_BLCKSZ); + int preWriteOff = 0; + int targetSize = file->write_size; + int *p_preWriteOff = &preWriteOff; + int *p_targetSize = &targetSize; for (;;) { off_t write_pos; @@ -1022,7 +1097,10 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers /* check for interrupt */ if (interrupted || thread_interrupted) + { + pg_free(preWriteBuf); elog(ERROR, "Interrupted during data file restore"); + } /* newer backups have headers in separate storage */ if (headers) @@ -1099,14 +1177,23 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers /* To correctly truncate file, we must first flush STDIO buffers */ if (fio_fflush(out) != 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot flush file \"%s\": %s", to_fullpath, strerror(errno)); + } /* Set position to the start of file */ if (fio_fseek(out, 0) < 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot seek to the start of file \"%s\": %s", to_fullpath, strerror(errno)); + } if (fio_ftruncate(out, blknum * BLCKSZ) != 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot truncate file \"%s\": %s", to_fullpath, strerror(errno)); + } break; } @@ -1119,7 +1206,10 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers break; if (compressed_size > BLCKSZ) + { + pg_free(preWriteBuf); elog(ERROR, "Size of a blknum %i exceed BLCKSZ: %i", blknum, compressed_size); + } /* Incremental restore in LSN mode */ if (map && lsn_map && datapagemap_is_set(lsn_map, blknum)) @@ -1147,8 +1237,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers * go to the next page. */ if (!headers && fseek(in, read_len, SEEK_CUR) != 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot seek block %u of \"%s\": %s", blknum, from_fullpath, strerror(errno)); + } continue; } @@ -1156,8 +1249,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers cur_pos_in != headers[n_hdr].pos) { if (fseek(in, headers[n_hdr].pos, SEEK_SET) != 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot seek to offset %u of \"%s\": %s", headers[n_hdr].pos, from_fullpath, strerror(errno)); + } cur_pos_in = headers[n_hdr].pos; } @@ -1169,8 +1265,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers len = fread(page.data, 1, read_len, in); if (len != read_len) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot read block %u file \"%s\": %s", blknum, from_fullpath, strerror(errno)); + } cur_pos_in += read_len; @@ -1197,8 +1296,11 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers if (cur_pos_out != write_pos) { if (fio_fseek(out, write_pos) < 0) + { + pg_free(preWriteBuf); elog(ERROR, "Cannot seek block %u of \"%s\": %s", blknum, to_fullpath, strerror(errno)); + } cur_pos_out = write_pos; } @@ -1206,20 +1308,62 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers /* If page is compressed and restore is in remote mode, send compressed * page to the remote side. */ - if (is_compressed) + if (IsDssMode() && targetSize >= DSS_BLCKSZ) { - ssize_t rc; - rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg); + if (is_compressed) + { + ssize_t rc; + rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, to_fullpath, preWriteBuf, p_preWriteOff, p_targetSize); - if (!fio_is_remote_file(out) && rc != BLCKSZ) - elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u", - blknum, to_fullpath, strerror(errno), compressed_size); + if (!fio_is_remote_file(out) && rc != BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Cannot write block %u of preWriteBuf: %s, size: %u", + blknum, strerror(errno), compressed_size); + } + } + else + { + int ret = memcpy_s(preWriteBuf + preWriteOff, DSS_BLCKSZ, page.data, BLCKSZ); + securec_check(ret, "\0", "\0"); + preWriteOff += BLCKSZ; + + if (preWriteOff == DSS_BLCKSZ) + { + if (fio_fwrite(out, preWriteBuf, DSS_BLCKSZ) != DSS_BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Cannot write block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + preWriteOff = 0; + targetSize = file->write_size - blknum * BLCKSZ; + } + } } else { - if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ) - elog(ERROR, "Cannot write block %u of \"%s\": %s", - blknum, to_fullpath, strerror(errno)); + if (is_compressed) + { + ssize_t rc; + rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg, NULL, NULL, NULL, NULL); + + if (!fio_is_remote_file(out) && rc != BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u", + blknum, to_fullpath, strerror(errno), compressed_size); + } + } + else + { + if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Cannot write block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + } } write_len += BLCKSZ; @@ -1229,8 +1373,8 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers if (map) datapagemap_add(map, blknum); } - + pg_free(preWriteBuf); return write_len; } @@ -1688,6 +1832,11 @@ check_data_file(ConnectionArgs *arguments, pgFile *file, */ nblocks = file->size/BLCKSZ; + PreReadBuf preReadBuf; + preReadBuf.num = -1; + preReadBuf.data = (char*)malloc(DSS_BLCKSZ); + if (preReadBuf.data == NULL) + elog(ERROR, "malloc preReadBuf.data failed, size : %d", DSS_BLCKSZ); for (blknum = 0; blknum < nblocks; blknum++) { PageState page_st; @@ -1695,7 +1844,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file, page_state = prepare_page(NULL, file, InvalidXLogRecPtr, blknum, in, BACKUP_MODE_FULL, curr_page, false, checksum_version, - from_fullpath, &page_st, NULL, read_len); + from_fullpath, &page_st, NULL, read_len, &preReadBuf); if (page_state == PageIsTruncated) break; @@ -1710,6 +1859,7 @@ check_data_file(ConnectionArgs *arguments, pgFile *file, } } + pg_free(preReadBuf.data); fclose(in); return is_valid; } @@ -2222,6 +2372,11 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); } + PreReadBuf preReadBuf; + preReadBuf.num = -1; + preReadBuf.data = (char*)malloc(DSS_BLCKSZ); + if (preReadBuf.data == NULL) + elog(ERROR, "malloc preReadBuf.data failed, size : %d", DSS_BLCKSZ); while (blknum < (BlockNumber)file->n_blocks) { PageState page_st; @@ -2229,7 +2384,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f int rc = prepare_page(conn_arg, file, prev_backup_start_lsn, blknum, in, backup_mode, curr_page, true, checksum_version, - from_fullpath, &page_st, pageCompression, read_len); + from_fullpath, &page_st, pageCompression, read_len, &preReadBuf); if (rc == PageIsTruncated) break; @@ -2277,6 +2432,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f else blknum++; } + pg_free(preReadBuf.data); /* * Add dummy header, so we can later extract the length of last header diff --git a/src/bin/pg_probackup/file.cpp b/src/bin/pg_probackup/file.cpp index cc0f4d55b..8a4cb3389 100644 --- a/src/bin/pg_probackup/file.cpp +++ b/src/bin/pg_probackup/file.cpp @@ -633,7 +633,7 @@ int fio_truncate(int fd, off_t size) /* * Read file from specified location. */ -int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression) +int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression, int size) { if (fio_is_remote_file(f)) { @@ -660,13 +660,13 @@ int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression) { /* For local file, opened by fopen, we should use stdio functions */ if (pageCompression) { - return (int)pageCompression->ReadCompressedBuffer((BlockNumber)(offs / BLCKSZ), (char*)buf, BLCKSZ, true); + return (int)pageCompression->ReadCompressedBuffer((BlockNumber)(offs / BLCKSZ), (char*)buf, size, true); } else { int rc = fseek(f, offs, SEEK_SET); if (rc < 0) { return rc; } - return fread(buf, 1, BLCKSZ, f); + return fread(buf, 1, size, f); } } } @@ -765,7 +765,7 @@ fio_decompress(void* dst, void const* src, size_t size, int compress_alg) } /* Write data to the file */ -ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg) +ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char *to_fullpath, char *preWriteBuf, int *preWriteOff, int *targetSize) { if (fio_is_remote_file(f)) { @@ -783,13 +783,37 @@ ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compres } else { - /* operate is same in local mode and dss mode */ - char uncompressed_buf[BLCKSZ]; - int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg); - - return (uncompressed_size < 0) - ? uncompressed_size - : fio_fwrite(f, uncompressed_buf, uncompressed_size); + if (preWriteBuf != NULL) + { + int32 uncompressed_size = fio_decompress(preWriteBuf + (*preWriteOff), buf, size, compress_alg); + *preWriteOff += uncompressed_size; + if (*preWriteOff > DSS_BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Offset %d is bigger than preWriteBuf size %d", *preWriteOff, DSS_BLCKSZ); + } + if (*preWriteOff == DSS_BLCKSZ) + { + int write_len = fio_fwrite(f, preWriteBuf, DSS_BLCKSZ); + if (write_len != DSS_BLCKSZ) + { + pg_free(preWriteBuf); + elog(ERROR, "Cannot write block of \"%s\": %s, size: %u", + to_fullpath, strerror(errno), DSS_BLCKSZ); + } + *preWriteOff = 0; + *targetSize -= DSS_BLCKSZ; + } + return uncompressed_size; + } + else + { + char uncompressed_buf[BLCKSZ]; + int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg); + return (uncompressed_size < 0) + ? uncompressed_size + : fio_fwrite(f, uncompressed_buf, uncompressed_size); + } } } diff --git a/src/bin/pg_probackup/file.h b/src/bin/pg_probackup/file.h index aa465e604..1719d492a 100644 --- a/src/bin/pg_probackup/file.h +++ b/src/bin/pg_probackup/file.h @@ -85,6 +85,7 @@ typedef enum #define IO_CHECK(cmd, size) do { int _rc = (cmd); if (_rc != (int)(size)) fio_error(_rc, size, __FILE__, __LINE__); } while (0) #define FILE_PERMISSIONS 0600 +#define DSS_BLCKSZ 2097152 //2M , In dss mode, the size of operation(read and write) from a DSS file is 2M typedef struct { @@ -112,9 +113,9 @@ extern int fio_get_agent_version(void); extern FILE* fio_fopen(char const* name, char const* mode, fio_location location); extern size_t fio_fwrite(FILE* f, void const* buf, size_t size); extern void fio_construct_compressed(void const* buf, size_t size); -extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg); +extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg, const char *to_fullpath, char *preWriteBuf, int *preWriteOff, int *targetSize); extern ssize_t fio_fread(FILE* f, void* buf, size_t size); -extern int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression); +extern int fio_pread(FILE* f, void* buf, off_t offs, PageCompression* pageCompression, int size); extern int fio_fprintf(FILE* f, char const* arg, ...);// pg_attribute_printf(2, 3); extern int fio_fflush(FILE* f); extern int fio_fseek(FILE* f, off_t offs);