diff --git a/src/common/backend/utils/cache/relcache.cpp b/src/common/backend/utils/cache/relcache.cpp index a202f72c6..a668e3546 100644 --- a/src/common/backend/utils/cache/relcache.cpp +++ b/src/common/backend/utils/cache/relcache.cpp @@ -6986,6 +6986,132 @@ struct PublicationActions* GetRelationPublicationActions(Relation relation) return pubactions; } +#define DSS_BUFFER_IO_SIZE (8192) +#define MIN_SIZE(a, b) (((a) > (b)) ? (b) : (a)) +static bool init_aligned_buffer(bool for_input, FILE *stream) +{ + knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt; + if (ENABLE_DSS) { + struct stat s; + int fd = fileno(stream); + if (fstat(fd, &s) < 0) { + return false; + } + dms_cxt->file_size = (int)s.st_size; + dms_cxt->origin_buf = (char *)palloc(DSS_BUFFER_IO_SIZE + ALIGNOF_BUFFER); + dms_cxt->aligned_buf = (char *)BUFFERALIGN(dms_cxt->origin_buf); + dms_cxt->offset = (for_input) ? DSS_BUFFER_IO_SIZE : 0; + dms_cxt->size = DSS_BUFFER_IO_SIZE; + } else { + dms_cxt->origin_buf = NULL; + dms_cxt->aligned_buf = NULL; + dms_cxt->offset = 0; + dms_cxt->size = 0; + dms_cxt->file_size = 0; + } + return true; +} + +static size_t fread_wrap(void *ptr, size_t size, size_t nmemb, FILE *stream) +{ + if (ENABLE_DSS) { + errno_t ret; + knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt; + char *dest_ptr = (char *)ptr; + size_t copy_size, read_size; + size_t left_size = size * nmemb; + + while (left_size > 0) { + if (dms_cxt->offset >= dms_cxt->size) { + read_size = (size_t)MIN_SIZE(DSS_BUFFER_IO_SIZE, dms_cxt->file_size); + if (read_size == 0) { + return (size * nmemb - left_size) / size; + } + if (fread(dms_cxt->aligned_buf, 1, read_size, stream) != read_size) { + return (size * nmemb - left_size) / size; + } + dms_cxt->offset = 0; + dms_cxt->size = (int)read_size; + dms_cxt->file_size -= (int)read_size; + } + copy_size = MIN_SIZE(left_size, (size_t)(dms_cxt->size - dms_cxt->offset)); + ret = memcpy_s(dest_ptr, left_size, dms_cxt->aligned_buf + dms_cxt->offset, copy_size); + securec_check(ret, "\0", "\0"); + dest_ptr += copy_size; + left_size -= copy_size; + dms_cxt->offset += (int)copy_size; + } + + return nmemb; + } else { + return fread(ptr, size, nmemb, stream); + } +} + +static size_t fwrite_wrap(const void *ptr, size_t size, size_t nitems, FILE *stream) +{ + if (ENABLE_DSS) { + errno_t ret; + knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt; + char *src_ptr = (char *)ptr; + size_t copy_size; + size_t left_size = size * nitems; + + while (left_size > 0) { + if (dms_cxt->offset >= dms_cxt->size) { + if (fwrite(dms_cxt->aligned_buf, 1, dms_cxt->size, stream) != (size_t)dms_cxt->size) { + return (size * nitems - left_size) / size; + } + dms_cxt->offset = 0; + } + copy_size = MIN_SIZE(left_size, (size_t)(dms_cxt->size - dms_cxt->offset)); + ret = memcpy_s(dms_cxt->aligned_buf + dms_cxt->offset, dms_cxt->size - dms_cxt->offset, + src_ptr, copy_size); + securec_check(ret, "\0", "\0"); + src_ptr += copy_size; + left_size -= copy_size; + dms_cxt->offset += (int)copy_size; + } + return nitems; + } else { + return fwrite(ptr, size, nitems, stream); + } +} + +static bool flush_align_buffer(FILE *stream) +{ + if (ENABLE_DSS) { + knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt; + if (dms_cxt->offset == 0) { + return true; + } else { + int aligned_size = (int)BUFFERALIGN(dms_cxt->offset); + Assert(aligned_size <= dms_cxt->size); + if (dms_cxt->offset < aligned_size) { + errno_t ret = memset_s(dms_cxt->aligned_buf + dms_cxt->offset, aligned_size - dms_cxt->offset, + 0, aligned_size - dms_cxt->offset); + securec_check(ret, "\0", "\0"); + } + size_t num = fwrite(dms_cxt->aligned_buf, 1, aligned_size, stream); + return (((int)num) == aligned_size); + } + } else { + return true; + } +} + +static void free_aligned_buffer() +{ + if (ENABLE_DSS) { + knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt; + pfree_ext(dms_cxt->origin_buf); + dms_cxt->aligned_buf = NULL; + dms_cxt->offset = 0; + dms_cxt->size = 0; + dms_cxt->file_size = 0; + } +} + /* * load_relcache_init_file, write_relcache_init_file * @@ -7042,10 +7168,6 @@ struct PublicationActions* GetRelationPublicationActions(Relation relation) */ static bool load_relcache_init_file(bool shared) { - if (ENABLE_DMS) { - return false; - } - FILE* fp = NULL; char initfilename[MAXPGPATH]; Relation* rels = NULL; @@ -7073,8 +7195,15 @@ static bool load_relcache_init_file(bool shared) securec_check_ss(rc, "\0", "\0"); fp = AllocateFile(initfilename, PG_BINARY_R); - if (fp == NULL) + if (fp == NULL) { return false; + } + + if (ENABLE_DSS) { + if (!init_aligned_buffer(true, fp)) { + goto read_failed; + } + } /* * Read the index relcache entries from the file. Note we will not enter @@ -7087,7 +7216,7 @@ static bool load_relcache_init_file(bool shared) nailed_rels = nailed_indexes = 0; /* check for correct magic number (compatible version) */ - if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic)) + if (fread_wrap(&magic, 1, sizeof(magic), fp) != sizeof(magic)) goto read_failed; if (magic != RELCACHE_INIT_FILEMAGIC) goto read_failed; @@ -7101,16 +7230,24 @@ static bool load_relcache_init_file(bool shared) int default_num; /* first read the relation descriptor length */ - nread = fread(&len, 1, sizeof(len), fp); + nread = fread_wrap(&len, 1, sizeof(len), fp); if (nread != sizeof(len)) { if (nread == 0) break; /* end of file */ goto read_failed; } + if (ENABLE_DSS) { + /* we append zero for 512 aligned in the end of file */ + if (len == 0) { + break; // end of file for DSS + } + } + /* safety check for incompatible relcache layout */ - if (len != sizeof(RelationData)) + if (len != sizeof(RelationData)) { goto read_failed; + } /* allocate another relcache header */ if (num_rels >= max_rels) { @@ -7121,18 +7258,18 @@ static bool load_relcache_init_file(bool shared) rel = rels[num_rels++] = (Relation)palloc(len); /* then, read the Relation structure */ - if (fread(rel, 1, len, fp) != len) + if (fread_wrap(rel, 1, len, fp) != len) goto read_failed; /* next read the relation tuple form */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len != CLASS_TUPLE_SIZE) { goto read_failed; } relform = (Form_pg_class)palloc(len); - if (fread(relform, 1, len, fp) != len) + if (fread_wrap(relform, 1, len, fp) != len) goto read_failed; rel->rd_rel = relform; @@ -7149,11 +7286,11 @@ static bool load_relcache_init_file(bool shared) has_not_null = false; default_num = 0; for (i = 0; i < relform->relnatts; i++) { - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len != ATTRIBUTE_FIXED_PART_SIZE) goto read_failed; - if (fread(rel->rd_att->attrs[i], 1, len, fp) != len) + if (fread_wrap(rel->rd_att->attrs[i], 1, len, fp) != len) goto read_failed; has_not_null = has_not_null || rel->rd_att->attrs[i]->attnotnull; @@ -7172,11 +7309,11 @@ static bool load_relcache_init_file(bool shared) } /* next read the access method specific field */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > 0 && len < MaxAllocSize) { rel->rd_options = (bytea*)palloc(len); - if (fread(rel->rd_options, 1, len, fp) != len) + if (fread_wrap(rel->rd_options, 1, len, fp) != len) goto read_failed; if (len != VARSIZE(rel->rd_options)) goto read_failed; /* sanity check */ @@ -7212,13 +7349,13 @@ static bool load_relcache_init_file(bool shared) nailed_indexes++; /* next, read the pg_index tuple */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > HEAPTUPLESIZE + MaxIndexTuplesPerPage) { goto read_failed; } rel->rd_indextuple = (HeapTuple)heaptup_alloc(len); - if (fread(rel->rd_indextuple, 1, len, fp) != len) + if (fread_wrap(rel->rd_indextuple, 1, len, fp) != len) goto read_failed; /* Fix up internal pointers in the tuple -- see heap_copytuple */ @@ -7227,14 +7364,14 @@ static bool load_relcache_init_file(bool shared) IndexRelationInitKeyNums(rel); /* next, read the access method tuple form */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len != sizeof(FormData_pg_am)) { goto read_failed; } am = (Form_pg_am)palloc(len); - if (fread(am, 1, len, fp) != len) + if (fread_wrap(am, 1, len, fp) != len) goto read_failed; rel->rd_am = am; @@ -7250,62 +7387,62 @@ static bool load_relcache_init_file(bool shared) rel->rd_indexcxt = indexcxt; /* next, read the vector of opfamily OIDs */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > relform->relnatts * sizeof(Oid)) { goto read_failed; } opfamily = (Oid*)MemoryContextAlloc(indexcxt, len); - if (fread(opfamily, 1, len, fp) != len) + if (fread_wrap(opfamily, 1, len, fp) != len) goto read_failed; rel->rd_opfamily = opfamily; /* next, read the vector of opcintype OIDs */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > relform->relnatts * sizeof(Oid)) { goto read_failed; } opcintype = (Oid*)MemoryContextAlloc(indexcxt, len); - if (fread(opcintype, 1, len, fp) != len) + if (fread_wrap(opcintype, 1, len, fp) != len) goto read_failed; rel->rd_opcintype = opcintype; /* next, read the vector of support procedure OIDs */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > relform->relnatts * (am->amsupport * sizeof(RegProcedure))) { goto read_failed; } support = (RegProcedure*)MemoryContextAlloc(indexcxt, len); - if (fread(support, 1, len, fp) != len) + if (fread_wrap(support, 1, len, fp) != len) goto read_failed; rel->rd_support = support; /* next, read the vector of collation OIDs */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > relform->relnatts * sizeof(Oid)) { goto read_failed; } indcollation = (Oid*)MemoryContextAlloc(indexcxt, len); - if (fread(indcollation, 1, len, fp) != len) + if (fread_wrap(indcollation, 1, len, fp) != len) goto read_failed; rel->rd_indcollation = indcollation; /* finally, read the vector of indoption values */ - if (fread(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) goto read_failed; if (len > relform->relnatts * sizeof(int16)) { goto read_failed; } indoption = (int16*)MemoryContextAlloc(indexcxt, len); - if (fread(indoption, 1, len, fp) != len) + if (fread_wrap(indoption, 1, len, fp) != len) goto read_failed; rel->rd_indoption = indoption; @@ -7417,6 +7554,10 @@ static bool load_relcache_init_file(bool shared) pfree_ext(rels); FreeFile(fp); + if (ENABLE_DSS) { + free_aligned_buffer(); + } + if (shared) u_sess->relcache_cxt.criticalSharedRelcachesBuilt = true; else @@ -7432,6 +7573,10 @@ read_failed: pfree_ext(rels); FreeFile(fp); + if (ENABLE_DSS) { + free_aligned_buffer(); + } + return false; } @@ -7441,7 +7586,7 @@ read_failed: */ static void write_relcache_init_file(bool shared) { - if (ENABLE_DSS || SS_STANDBY_MODE) { + if (SS_STANDBY_MODE) { return; } @@ -7522,12 +7667,18 @@ static void write_relcache_init_file(bool shared) return; } + if (ENABLE_DSS) { + if (!init_aligned_buffer(false, fp)) { + ereport(FATAL, (errmsg("could not write init file"))); + } + } + /* * Write a magic number to serve as a file version identifier. We can * change the magic number whenever the relcache layout changes. */ magic = RELCACHE_INIT_FILEMAGIC; - if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic)) + if (fwrite_wrap(&magic, 1, sizeof(magic), fp) != sizeof(magic)) ereport(FATAL, (errmsg("could not write init file"))); /* @@ -7593,6 +7744,13 @@ static void write_relcache_init_file(bool shared) } } + if (ENABLE_DSS) { + if (!flush_align_buffer(fp)) { + ereport(FATAL, (errmsg("could not write init file"))); + } + free_aligned_buffer(); + } + if (FreeFile(fp)) ereport(FATAL, (errmsg("could not write init file"))); @@ -7639,9 +7797,9 @@ static void write_relcache_init_file(bool shared) /* write a chunk of data preceded by its length */ static void write_item(const void* data, Size len, FILE* fp) { - if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len)) + if (fwrite_wrap(&len, 1, sizeof(len), fp) != sizeof(len)) ereport(FATAL, (errmsg("could not write init file"))); - if (fwrite(data, 1, len, fp) != len) + if (fwrite_wrap(data, 1, len, fp) != len) ereport(FATAL, (errmsg("could not write init file"))); } diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index 2cf1cbdf2..538aa893a 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1689,6 +1689,19 @@ static void knl_t_sql_patch_init(knl_t_sql_patch_context* sql_patch_cxt) sql_patch_cxt->sql_patch_prev_post_parse_analyze_hook = NULL; } +static void knl_t_dms_context_init(knl_t_dms_context *dms_cxt) +{ + dms_cxt->msgContext = NULL; + dms_cxt->buf_in_aio = false; + dms_cxt->is_reform_proc = false; + dms_cxt->CloseAllSessionsFailed = false; + dms_cxt->origin_buf = NULL; + dms_cxt->aligned_buf = NULL; + dms_cxt->offset = 0; + dms_cxt->size = 0; + dms_cxt->file_size = 0; +} + #ifdef ENABLE_MOT static void knl_t_mot_init(knl_t_mot_context* mot_cxt) { @@ -1870,6 +1883,7 @@ void knl_thread_init(knl_thread_role role) knl_t_bgworker_init(&t_thrd.bgworker_cxt); knl_index_advisor_init(&t_thrd.index_advisor_cxt); knl_t_sql_patch_init(&t_thrd.sql_patch_cxt); + knl_t_dms_context_init(&t_thrd.dms_cxt); KnlTApplyLauncherInit(&t_thrd.applylauncher_cxt); KnlTApplyWorkerInit(&t_thrd.applyworker_cxt); KnlTPublicationInit(&t_thrd.publication_cxt); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 42b4585f3..a4a7cc80b 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3332,6 +3332,11 @@ typedef struct knl_t_dms_context { bool buf_in_aio; bool is_reform_proc; bool CloseAllSessionsFailed; + char *origin_buf; /* origin buffer for unaligned read/write */ + char *aligned_buf; + int size; /* aligned buffer size */ + int offset; /* current read/write position in aligned_buf */ + int file_size; /* initialized as pg_internal.init file size, will decrease after read */ } knl_t_dms_context; /* thread context. */