diff --git a/src/gausskernel/ddes/adapter/ss_aio.cpp b/src/gausskernel/ddes/adapter/ss_aio.cpp index d0067a2b2..e903c83aa 100644 --- a/src/gausskernel/ddes/adapter/ss_aio.cpp +++ b/src/gausskernel/ddes/adapter/ss_aio.cpp @@ -49,6 +49,7 @@ static void WaitDSSAioComplete(DSSAioCxt *aio_cxt, int index) for (int i = 0; i < num; i++) { aio_cxt->aiocb(&aio->events[i]); } + event_num -= num; } diff --git a/src/gausskernel/process/postmaster/pagewriter.cpp b/src/gausskernel/process/postmaster/pagewriter.cpp index 96836e373..4c4f0dfc2 100755 --- a/src/gausskernel/process/postmaster/pagewriter.cpp +++ b/src/gausskernel/process/postmaster/pagewriter.cpp @@ -47,6 +47,7 @@ #include "gstrace/gstrace_infra.h" #include "gstrace/postmaster_gstrace.h" #include "ddes/dms/ss_dms_bufmgr.h" +#include "storage/dss/fio_dss.h" #include @@ -323,6 +324,8 @@ void incre_ckpt_pagewriter_cxt_init() /* 2M AIO buffer */ char *unaligned_buf = (char *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * BLCKSZ + BLCKSZ); pgwr->aio_buf = (char *)TYPEALIGN(BLCKSZ, unaligned_buf); + pgwr->aio_extra = + (PgwrAioExtraData *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * sizeof(PgwrAioExtraData)); } } @@ -1643,7 +1646,8 @@ void crps_destory_ctxs() static void incre_ckpt_aio_callback(struct io_event *event) { - BufferDesc *buf_desc = (BufferDesc *)(event->data); + PgwrAioExtraData *tempAioExtra = (PgwrAioExtraData *)(event->data); + BufferDesc *buf_desc = (BufferDesc *)(tempAioExtra->aio_bufdesc); uint32 written_size = event->obj->u.c.nbytes; if (written_size != event->res) { ereport(WARNING, (errmsg("aio write failed (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", -(int32)(event->res), @@ -1687,6 +1691,21 @@ static void incre_ckpt_aio_callback(struct io_event *event) pfree(origin_buf); #endif + off_t roffset = 0; + if (IsSegmentBufferID(buf_desc->buf_id)) { + roffset = ((buf_desc->tag.blockNum) % RELSEG_SIZE) * BLCKSZ; + } else { + roffset = ((buf_desc->extra->seg_blockno) % RELSEG_SIZE) * BLCKSZ; + } + + int aioRet = dss_aio_post_pwrite(event->obj->data, tempAioExtra->aio_fd, event->obj->u.c.nbytes, roffset); + if (aioRet != 0) { + ereport(PANIC, (errmsg("failed to post write by asnyc io (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", errno, + buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, + (int32)buf_desc->tag.rnode.bucketNode, (int32)buf_desc->tag.rnode.opt, + buf_desc->tag.forkNum, buf_desc->tag.blockNum))); + } + buf_desc->extra->aio_in_progress = false; UnpinBuffer(buf_desc, true); } diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 9ab157778..e062afafd 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -5147,8 +5147,10 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo securec_check(ret, "\0", "\0"); struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt); + PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]); int32 io_ret = seg_physical_aio_prep_pwrite(spc, fakenode, bufferinfo.blockinfo.forknum, - bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr); + bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra); + tempAioExtra->aio_bufdesc = (void *)bufdesc; if (io_ret != DSS_SUCCESS) { ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u", fakenode.spcNode, fakenode.dbNode, fakenode.relNode, (int)fakenode.bucketNode, @@ -5164,7 +5166,7 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo t_thrd.dms_cxt.buf_in_aio = true; bufdesc->extra->aio_in_progress = true; /* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */ - iocb_ptr->data = (void *)bufdesc; + iocb_ptr->data = (void *)tempAioExtra; DSSAioAppendIOCB(aio_cxt, iocb_ptr); } else { seg_physical_write(spc, fakenode, bufferinfo.blockinfo.forknum, bufdesc->extra->seg_blockno, bufToWrite, diff --git a/src/gausskernel/storage/dss/dss_adaptor.cpp b/src/gausskernel/storage/dss/dss_adaptor.cpp index 0b1ce5c67..5af7d11f1 100644 --- a/src/gausskernel/storage/dss/dss_adaptor.cpp +++ b/src/gausskernel/storage/dss/dss_adaptor.cpp @@ -121,8 +121,7 @@ int dss_device_init(const char *conn_path, bool enable_dss) SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_stat", (void **)&device_op.dss_stat)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_lstat", (void **)&device_op.dss_lstat)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_fstat", (void **)&device_op.dss_fstat)); - SS_RETURN_IFERR( - dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst)); + SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_svr_path", (void **)&device_op.dss_set_svr_path)); SS_RETURN_IFERR( dss_load_symbol(device_op.handle, "dss_register_log_callback", (void **)&device_op.dss_register_log_callback)); @@ -131,6 +130,7 @@ int dss_device_init(const char *conn_path, bool enable_dss) SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_compare_size_equal", (void **)&device_op.dss_compare_size)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pwrite", (void **)&device_op.dss_aio_pwrite)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pread", (void **)&device_op.dss_aio_pread)); + SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_post_pwrite", (void **)&device_op.dss_aio_post_pwrite)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_init_logger", (void **)&device_op.dss_init_logger)); SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_refresh_logger", (void **)&device_op.dss_refresh_logger)); diff --git a/src/gausskernel/storage/dss/fio_dss.cpp b/src/gausskernel/storage/dss/fio_dss.cpp index afbee1add..8471f21e9 100644 --- a/src/gausskernel/storage/dss/fio_dss.cpp +++ b/src/gausskernel/storage/dss/fio_dss.cpp @@ -721,3 +721,8 @@ int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long of { return g_dss_device_op.dss_aio_pread(iocb, fd, buf, count, offset); } + +int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset) +{ + return g_dss_device_op.dss_aio_post_pwrite(iocb, fd, count, offset); +} \ No newline at end of file diff --git a/src/gausskernel/storage/smgr/segment/data_file.cpp b/src/gausskernel/storage/smgr/segment/data_file.cpp index 593be40bb..16253c639 100644 --- a/src/gausskernel/storage/smgr/segment/data_file.cpp +++ b/src/gausskernel/storage/smgr/segment/data_file.cpp @@ -503,7 +503,11 @@ void df_extend_internal(SegLogicFile *sf) df_extend_file_vector(sf); } int new_fd; - new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE); + if (ENABLE_DSS) { + new_fd = dv_open_file(filename, O_RDWR | O_CREAT | DSS_FT_NODE_FLAG_INNER_INITED, SEGMENT_FILE_MODE); + } else { + new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE); + } if (new_fd < 0) { ereport(ERROR, (errcode_for_file_access(), errmsg("[segpage] could not create file \"%s\": %m", filename))); } diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index 448f6acb6..85bbe77e1 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -420,8 +420,10 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln) securec_check(ret, "\0", "\0"); struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt); + PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]); int32 io_ret = seg_physical_aio_prep_pwrite(spc, buf->tag.rnode, buf->tag.forkNum, - buf->tag.blockNum, tempBuf, (void *)iocb_ptr); + buf->tag.blockNum, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra); + tempAioExtra->aio_bufdesc = (void *)buf; if (io_ret != DSS_SUCCESS) { ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u", buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, (int)buf->tag.rnode.bucketNode, @@ -437,7 +439,7 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln) buf->extra->aio_in_progress = true; t_thrd.dms_cxt.buf_in_aio = true; /* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */ - iocb_ptr->data = (void *)buf; + iocb_ptr->data = (void *)tempAioExtra; DSSAioAppendIOCB(aio_cxt, iocb_ptr); } else { seg_physical_write(spc, buf->tag.rnode, buf->tag.forkNum, buf->tag.blockNum, (char *)buf_to_write, false); diff --git a/src/gausskernel/storage/smgr/segment/space.cpp b/src/gausskernel/storage/smgr/segment/space.cpp index 3843de05a..dff60bdc4 100644 --- a/src/gausskernel/storage/smgr/segment/space.cpp +++ b/src/gausskernel/storage/smgr/segment/space.cpp @@ -40,8 +40,8 @@ #include "utils/relfilenodemap.h" #include "pgxc/execRemote.h" #include "ddes/dms/ss_transaction.h" +#include "ddes/dms/ss_aio.h" #include "storage/file/fio_device.h" -#include "libaio.h" static void SSInitSegLogicFile(SegSpace *spc); @@ -108,7 +108,7 @@ void spc_write_block(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, con } int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum, - const char *buffer, void *iocb_ptr) + const char *buffer, void *iocb_ptr, void *tempAioExtra) { int egid = EXTENT_TYPE_TO_GROUPID(relNode.relNode); SegExtentGroup *seg = &spc->extent_group[egid][forknum]; @@ -120,6 +120,7 @@ int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum SegPhysicalFile spf = df_get_physical_file(seg->segfile, sliceno, blocknum); int32 ret; if (is_dss_fd(spf.fd)) { + ((PgwrAioExtraData *)tempAioExtra)->aio_fd = spf.fd; ret = dss_aio_prep_pwrite(iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset); } else { io_prep_pwrite((struct iocb *)iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset); diff --git a/src/gausskernel/storage/smgr/segstore.cpp b/src/gausskernel/storage/smgr/segstore.cpp index 6102cce6a..dfe77382f 100755 --- a/src/gausskernel/storage/smgr/segstore.cpp +++ b/src/gausskernel/storage/smgr/segstore.cpp @@ -1922,12 +1922,12 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B } int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum, - const char *buffer, void *iocb_ptr) + const char *buffer, void *iocb_ptr, void *tempAioExtra) { SegmentCheck(IsSegmentPhysicalRelNode(rNode)); SegmentCheck(spc != NULL); - return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr); + return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr, tempAioExtra); } static bool check_meta_data(BlockNumber extent, uint32 extent_size, uint32* offset_block) { diff --git a/src/include/ddes/dms/ss_aio.h b/src/include/ddes/dms/ss_aio.h index db41149e1..58d3c2b97 100644 --- a/src/include/ddes/dms/ss_aio.h +++ b/src/include/ddes/dms/ss_aio.h @@ -49,6 +49,11 @@ typedef struct DSSAioCxt { AioUtil aio[DSS_AIO_UTIL_NUM]; } DSSAioCxt; +typedef struct PgwrAioExtraData { + int aio_fd; + void* aio_bufdesc; +} PgwrAioExtraData; + void DSSAioInitialize(DSSAioCxt *aio_cxt, aio_callback callback); void DSSAioDestroy(DSSAioCxt *aio_cxt); struct iocb* DSSAioGetIOCB(DSSAioCxt *aio_cxt); diff --git a/src/include/postmaster/pagewriter.h b/src/include/postmaster/pagewriter.h index 6d458f6cd..c55f428e9 100644 --- a/src/include/postmaster/pagewriter.h +++ b/src/include/postmaster/pagewriter.h @@ -87,6 +87,7 @@ typedef struct PageWriterProc { /* auxiluary structs for implementing AIO in DSS */ DSSAioCxt aio_cxt; char *aio_buf; + PgwrAioExtraData* aio_extra; } PageWriterProc; typedef struct PageWriterProcs { diff --git a/src/include/storage/dss/dss_adaptor.h b/src/include/storage/dss/dss_adaptor.h index 481d3fe91..e81b0ff59 100644 --- a/src/include/storage/dss/dss_adaptor.h +++ b/src/include/storage/dss/dss_adaptor.h @@ -69,7 +69,9 @@ typedef int (*dss_get_storage_addr)(int handle, long long offset, char *poolname typedef int (*dss_compare_size_equal)(const char *vg_name, long long *au_size); typedef int (*dss_aio_prep_pwrite_device)(void *iocb, int handle, void *buf, size_t count, long long offset); typedef int (*dss_aio_prep_pread_device)(void *iocb, int handle, void *buf, size_t count, long long offset); -typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count, unsigned long long log_max_file_size); +typedef int (*dss_aio_post_pwrite_device)(void *iocb, int handle, size_t count, long long offset); +typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count, + unsigned long long log_max_file_size); typedef void (*dss_refresh_logger_t)(char *log_field, unsigned long long *value); typedef int (*dss_set_main)(void); typedef struct st_dss_device_op_t { @@ -109,6 +111,7 @@ typedef struct st_dss_device_op_t { dss_compare_size_equal dss_compare_size; dss_aio_prep_pwrite_device dss_aio_pwrite; dss_aio_prep_pread_device dss_aio_pread; + dss_aio_post_pwrite_device dss_aio_post_pwrite; dss_init_logger_t dss_init_logger; dss_refresh_logger_t dss_refresh_logger; dss_set_main dss_set_main_inst; diff --git a/src/include/storage/dss/fio_dss.h b/src/include/storage/dss/fio_dss.h index f179f90f2..ccea0e151 100644 --- a/src/include/storage/dss/fio_dss.h +++ b/src/include/storage/dss/fio_dss.h @@ -80,5 +80,5 @@ int dss_get_addr(int handle, long long offset, char *poolname, char *imagename, int dss_compare_size(const char *vg_name, long long *au_size); int dss_aio_prep_pwrite(void *iocb, int fd, void *buf, size_t count, long long offset); int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long offset); - +int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset); #endif // FIO_DSS_H diff --git a/src/include/storage/smgr/segment.h b/src/include/storage/smgr/segment.h index 4e4c2caeb..a87f7aa50 100644 --- a/src/include/storage/smgr/segment.h +++ b/src/include/storage/smgr/segment.h @@ -60,7 +60,7 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B XLogRecPtr seg_get_headlsn(SegSpace *spc, BlockNumber blockNum, bool isbucket); int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum, - const char *buffer, void *iocb_ptr); + const char *buffer, void *iocb_ptr, void *tempAioExtra); /* segment sync callback */ void forget_space_fsync_request(SegSpace *spc); diff --git a/src/include/storage/smgr/segment_internal.h b/src/include/storage/smgr/segment_internal.h index a29a415c5..eb1943811 100644 --- a/src/include/storage/smgr/segment_internal.h +++ b/src/include/storage/smgr/segment_internal.h @@ -416,7 +416,7 @@ void spc_datafile_create(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknu void spc_extend_file(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum, BlockNumber blkno); bool spc_datafile_exist(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum); int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum, - const char *buffer, void *iocb_ptr); + const char *buffer, void *iocb_ptr, void *tempAioExtra); extern void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo);