!2928 1 load_relcache_init_file/write_relcache_init_file 使用512对齐的缓存读写,规避对接DSS后无法非512读写

Merge pull request !2928 from 陈超/master
This commit is contained in:
opengauss-bot
2023-02-21 06:20:19 +00:00
committed by Gitee
3 changed files with 210 additions and 33 deletions

View File

@ -6986,6 +6986,132 @@ struct PublicationActions* GetRelationPublicationActions(Relation relation)
return pubactions;
}
#define DSS_BUFFER_IO_SIZE (8192)
#define MIN_SIZE(a, b) (((a) > (b)) ? (b) : (a))
static bool init_aligned_buffer(bool for_input, FILE *stream)
{
knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt;
if (ENABLE_DSS) {
struct stat s;
int fd = fileno(stream);
if (fstat(fd, &s) < 0) {
return false;
}
dms_cxt->file_size = (int)s.st_size;
dms_cxt->origin_buf = (char *)palloc(DSS_BUFFER_IO_SIZE + ALIGNOF_BUFFER);
dms_cxt->aligned_buf = (char *)BUFFERALIGN(dms_cxt->origin_buf);
dms_cxt->offset = (for_input) ? DSS_BUFFER_IO_SIZE : 0;
dms_cxt->size = DSS_BUFFER_IO_SIZE;
} else {
dms_cxt->origin_buf = NULL;
dms_cxt->aligned_buf = NULL;
dms_cxt->offset = 0;
dms_cxt->size = 0;
dms_cxt->file_size = 0;
}
return true;
}
static size_t fread_wrap(void *ptr, size_t size, size_t nmemb, FILE *stream)
{
if (ENABLE_DSS) {
errno_t ret;
knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt;
char *dest_ptr = (char *)ptr;
size_t copy_size, read_size;
size_t left_size = size * nmemb;
while (left_size > 0) {
if (dms_cxt->offset >= dms_cxt->size) {
read_size = (size_t)MIN_SIZE(DSS_BUFFER_IO_SIZE, dms_cxt->file_size);
if (read_size == 0) {
return (size * nmemb - left_size) / size;
}
if (fread(dms_cxt->aligned_buf, 1, read_size, stream) != read_size) {
return (size * nmemb - left_size) / size;
}
dms_cxt->offset = 0;
dms_cxt->size = (int)read_size;
dms_cxt->file_size -= (int)read_size;
}
copy_size = MIN_SIZE(left_size, (size_t)(dms_cxt->size - dms_cxt->offset));
ret = memcpy_s(dest_ptr, left_size, dms_cxt->aligned_buf + dms_cxt->offset, copy_size);
securec_check(ret, "\0", "\0");
dest_ptr += copy_size;
left_size -= copy_size;
dms_cxt->offset += (int)copy_size;
}
return nmemb;
} else {
return fread(ptr, size, nmemb, stream);
}
}
static size_t fwrite_wrap(const void *ptr, size_t size, size_t nitems, FILE *stream)
{
if (ENABLE_DSS) {
errno_t ret;
knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt;
char *src_ptr = (char *)ptr;
size_t copy_size;
size_t left_size = size * nitems;
while (left_size > 0) {
if (dms_cxt->offset >= dms_cxt->size) {
if (fwrite(dms_cxt->aligned_buf, 1, dms_cxt->size, stream) != (size_t)dms_cxt->size) {
return (size * nitems - left_size) / size;
}
dms_cxt->offset = 0;
}
copy_size = MIN_SIZE(left_size, (size_t)(dms_cxt->size - dms_cxt->offset));
ret = memcpy_s(dms_cxt->aligned_buf + dms_cxt->offset, dms_cxt->size - dms_cxt->offset,
src_ptr, copy_size);
securec_check(ret, "\0", "\0");
src_ptr += copy_size;
left_size -= copy_size;
dms_cxt->offset += (int)copy_size;
}
return nitems;
} else {
return fwrite(ptr, size, nitems, stream);
}
}
static bool flush_align_buffer(FILE *stream)
{
if (ENABLE_DSS) {
knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt;
if (dms_cxt->offset == 0) {
return true;
} else {
int aligned_size = (int)BUFFERALIGN(dms_cxt->offset);
Assert(aligned_size <= dms_cxt->size);
if (dms_cxt->offset < aligned_size) {
errno_t ret = memset_s(dms_cxt->aligned_buf + dms_cxt->offset, aligned_size - dms_cxt->offset,
0, aligned_size - dms_cxt->offset);
securec_check(ret, "\0", "\0");
}
size_t num = fwrite(dms_cxt->aligned_buf, 1, aligned_size, stream);
return (((int)num) == aligned_size);
}
} else {
return true;
}
}
static void free_aligned_buffer()
{
if (ENABLE_DSS) {
knl_t_dms_context *dms_cxt = &t_thrd.dms_cxt;
pfree_ext(dms_cxt->origin_buf);
dms_cxt->aligned_buf = NULL;
dms_cxt->offset = 0;
dms_cxt->size = 0;
dms_cxt->file_size = 0;
}
}
/*
* load_relcache_init_file, write_relcache_init_file
*
@ -7042,10 +7168,6 @@ struct PublicationActions* GetRelationPublicationActions(Relation relation)
*/
static bool load_relcache_init_file(bool shared)
{
if (ENABLE_DMS) {
return false;
}
FILE* fp = NULL;
char initfilename[MAXPGPATH];
Relation* rels = NULL;
@ -7073,8 +7195,15 @@ static bool load_relcache_init_file(bool shared)
securec_check_ss(rc, "\0", "\0");
fp = AllocateFile(initfilename, PG_BINARY_R);
if (fp == NULL)
if (fp == NULL) {
return false;
}
if (ENABLE_DSS) {
if (!init_aligned_buffer(true, fp)) {
goto read_failed;
}
}
/*
* Read the index relcache entries from the file. Note we will not enter
@ -7087,7 +7216,7 @@ static bool load_relcache_init_file(bool shared)
nailed_rels = nailed_indexes = 0;
/* check for correct magic number (compatible version) */
if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
if (fread_wrap(&magic, 1, sizeof(magic), fp) != sizeof(magic))
goto read_failed;
if (magic != RELCACHE_INIT_FILEMAGIC)
goto read_failed;
@ -7101,16 +7230,24 @@ static bool load_relcache_init_file(bool shared)
int default_num;
/* first read the relation descriptor length */
nread = fread(&len, 1, sizeof(len), fp);
nread = fread_wrap(&len, 1, sizeof(len), fp);
if (nread != sizeof(len)) {
if (nread == 0)
break; /* end of file */
goto read_failed;
}
if (ENABLE_DSS) {
/* we append zero for 512 aligned in the end of file */
if (len == 0) {
break; // end of file for DSS
}
}
/* safety check for incompatible relcache layout */
if (len != sizeof(RelationData))
if (len != sizeof(RelationData)) {
goto read_failed;
}
/* allocate another relcache header */
if (num_rels >= max_rels) {
@ -7121,18 +7258,18 @@ static bool load_relcache_init_file(bool shared)
rel = rels[num_rels++] = (Relation)palloc(len);
/* then, read the Relation structure */
if (fread(rel, 1, len, fp) != len)
if (fread_wrap(rel, 1, len, fp) != len)
goto read_failed;
/* next read the relation tuple form */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len != CLASS_TUPLE_SIZE) {
goto read_failed;
}
relform = (Form_pg_class)palloc(len);
if (fread(relform, 1, len, fp) != len)
if (fread_wrap(relform, 1, len, fp) != len)
goto read_failed;
rel->rd_rel = relform;
@ -7149,11 +7286,11 @@ static bool load_relcache_init_file(bool shared)
has_not_null = false;
default_num = 0;
for (i = 0; i < relform->relnatts; i++) {
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len != ATTRIBUTE_FIXED_PART_SIZE)
goto read_failed;
if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
if (fread_wrap(rel->rd_att->attrs[i], 1, len, fp) != len)
goto read_failed;
has_not_null = has_not_null || rel->rd_att->attrs[i]->attnotnull;
@ -7172,11 +7309,11 @@ static bool load_relcache_init_file(bool shared)
}
/* next read the access method specific field */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > 0 && len < MaxAllocSize) {
rel->rd_options = (bytea*)palloc(len);
if (fread(rel->rd_options, 1, len, fp) != len)
if (fread_wrap(rel->rd_options, 1, len, fp) != len)
goto read_failed;
if (len != VARSIZE(rel->rd_options))
goto read_failed; /* sanity check */
@ -7212,13 +7349,13 @@ static bool load_relcache_init_file(bool shared)
nailed_indexes++;
/* next, read the pg_index tuple */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > HEAPTUPLESIZE + MaxIndexTuplesPerPage) {
goto read_failed;
}
rel->rd_indextuple = (HeapTuple)heaptup_alloc(len);
if (fread(rel->rd_indextuple, 1, len, fp) != len)
if (fread_wrap(rel->rd_indextuple, 1, len, fp) != len)
goto read_failed;
/* Fix up internal pointers in the tuple -- see heap_copytuple */
@ -7227,14 +7364,14 @@ static bool load_relcache_init_file(bool shared)
IndexRelationInitKeyNums(rel);
/* next, read the access method tuple form */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len != sizeof(FormData_pg_am)) {
goto read_failed;
}
am = (Form_pg_am)palloc(len);
if (fread(am, 1, len, fp) != len)
if (fread_wrap(am, 1, len, fp) != len)
goto read_failed;
rel->rd_am = am;
@ -7250,62 +7387,62 @@ static bool load_relcache_init_file(bool shared)
rel->rd_indexcxt = indexcxt;
/* next, read the vector of opfamily OIDs */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > relform->relnatts * sizeof(Oid)) {
goto read_failed;
}
opfamily = (Oid*)MemoryContextAlloc(indexcxt, len);
if (fread(opfamily, 1, len, fp) != len)
if (fread_wrap(opfamily, 1, len, fp) != len)
goto read_failed;
rel->rd_opfamily = opfamily;
/* next, read the vector of opcintype OIDs */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > relform->relnatts * sizeof(Oid)) {
goto read_failed;
}
opcintype = (Oid*)MemoryContextAlloc(indexcxt, len);
if (fread(opcintype, 1, len, fp) != len)
if (fread_wrap(opcintype, 1, len, fp) != len)
goto read_failed;
rel->rd_opcintype = opcintype;
/* next, read the vector of support procedure OIDs */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > relform->relnatts * (am->amsupport * sizeof(RegProcedure))) {
goto read_failed;
}
support = (RegProcedure*)MemoryContextAlloc(indexcxt, len);
if (fread(support, 1, len, fp) != len)
if (fread_wrap(support, 1, len, fp) != len)
goto read_failed;
rel->rd_support = support;
/* next, read the vector of collation OIDs */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > relform->relnatts * sizeof(Oid)) {
goto read_failed;
}
indcollation = (Oid*)MemoryContextAlloc(indexcxt, len);
if (fread(indcollation, 1, len, fp) != len)
if (fread_wrap(indcollation, 1, len, fp) != len)
goto read_failed;
rel->rd_indcollation = indcollation;
/* finally, read the vector of indoption values */
if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
if (fread_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
goto read_failed;
if (len > relform->relnatts * sizeof(int16)) {
goto read_failed;
}
indoption = (int16*)MemoryContextAlloc(indexcxt, len);
if (fread(indoption, 1, len, fp) != len)
if (fread_wrap(indoption, 1, len, fp) != len)
goto read_failed;
rel->rd_indoption = indoption;
@ -7417,6 +7554,10 @@ static bool load_relcache_init_file(bool shared)
pfree_ext(rels);
FreeFile(fp);
if (ENABLE_DSS) {
free_aligned_buffer();
}
if (shared)
u_sess->relcache_cxt.criticalSharedRelcachesBuilt = true;
else
@ -7432,6 +7573,10 @@ read_failed:
pfree_ext(rels);
FreeFile(fp);
if (ENABLE_DSS) {
free_aligned_buffer();
}
return false;
}
@ -7441,7 +7586,7 @@ read_failed:
*/
static void write_relcache_init_file(bool shared)
{
if (ENABLE_DSS || SS_STANDBY_MODE) {
if (SS_STANDBY_MODE) {
return;
}
@ -7522,12 +7667,18 @@ static void write_relcache_init_file(bool shared)
return;
}
if (ENABLE_DSS) {
if (!init_aligned_buffer(false, fp)) {
ereport(FATAL, (errmsg("could not write init file")));
}
}
/*
* Write a magic number to serve as a file version identifier. We can
* change the magic number whenever the relcache layout changes.
*/
magic = RELCACHE_INIT_FILEMAGIC;
if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
if (fwrite_wrap(&magic, 1, sizeof(magic), fp) != sizeof(magic))
ereport(FATAL, (errmsg("could not write init file")));
/*
@ -7593,6 +7744,13 @@ static void write_relcache_init_file(bool shared)
}
}
if (ENABLE_DSS) {
if (!flush_align_buffer(fp)) {
ereport(FATAL, (errmsg("could not write init file")));
}
free_aligned_buffer();
}
if (FreeFile(fp))
ereport(FATAL, (errmsg("could not write init file")));
@ -7639,9 +7797,9 @@ static void write_relcache_init_file(bool shared)
/* write a chunk of data preceded by its length */
static void write_item(const void* data, Size len, FILE* fp)
{
if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
if (fwrite_wrap(&len, 1, sizeof(len), fp) != sizeof(len))
ereport(FATAL, (errmsg("could not write init file")));
if (fwrite(data, 1, len, fp) != len)
if (fwrite_wrap(data, 1, len, fp) != len)
ereport(FATAL, (errmsg("could not write init file")));
}

View File

@ -1689,6 +1689,19 @@ static void knl_t_sql_patch_init(knl_t_sql_patch_context* sql_patch_cxt)
sql_patch_cxt->sql_patch_prev_post_parse_analyze_hook = NULL;
}
static void knl_t_dms_context_init(knl_t_dms_context *dms_cxt)
{
dms_cxt->msgContext = NULL;
dms_cxt->buf_in_aio = false;
dms_cxt->is_reform_proc = false;
dms_cxt->CloseAllSessionsFailed = false;
dms_cxt->origin_buf = NULL;
dms_cxt->aligned_buf = NULL;
dms_cxt->offset = 0;
dms_cxt->size = 0;
dms_cxt->file_size = 0;
}
#ifdef ENABLE_MOT
static void knl_t_mot_init(knl_t_mot_context* mot_cxt)
{
@ -1870,6 +1883,7 @@ void knl_thread_init(knl_thread_role role)
knl_t_bgworker_init(&t_thrd.bgworker_cxt);
knl_index_advisor_init(&t_thrd.index_advisor_cxt);
knl_t_sql_patch_init(&t_thrd.sql_patch_cxt);
knl_t_dms_context_init(&t_thrd.dms_cxt);
KnlTApplyLauncherInit(&t_thrd.applylauncher_cxt);
KnlTApplyWorkerInit(&t_thrd.applyworker_cxt);
KnlTPublicationInit(&t_thrd.publication_cxt);

View File

@ -3332,6 +3332,11 @@ typedef struct knl_t_dms_context {
bool buf_in_aio;
bool is_reform_proc;
bool CloseAllSessionsFailed;
char *origin_buf; /* origin buffer for unaligned read/write */
char *aligned_buf;
int size; /* aligned buffer size */
int offset; /* current read/write position in aligned_buf */
int file_size; /* initialized as pg_internal.init file size, will decrease after read */
} knl_t_dms_context;
/* thread context. */