新增适配dss aio post pwrite接口

This commit is contained in:
arcoalien@qq.com
2024-07-30 15:03:31 +08:00
committed by cchen676
parent 6f7cd7ade9
commit 0a179f7f76
15 changed files with 59 additions and 16 deletions

View File

@ -49,6 +49,7 @@ static void WaitDSSAioComplete(DSSAioCxt *aio_cxt, int index)
for (int i = 0; i < num; i++) {
aio_cxt->aiocb(&aio->events[i]);
}
event_num -= num;
}

View File

@ -47,6 +47,7 @@
#include "gstrace/gstrace_infra.h"
#include "gstrace/postmaster_gstrace.h"
#include "ddes/dms/ss_dms_bufmgr.h"
#include "storage/dss/fio_dss.h"
#include <zstd.h>
@ -323,6 +324,8 @@ void incre_ckpt_pagewriter_cxt_init()
/* 2M AIO buffer */
char *unaligned_buf = (char *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * BLCKSZ + BLCKSZ);
pgwr->aio_buf = (char *)TYPEALIGN(BLCKSZ, unaligned_buf);
pgwr->aio_extra =
(PgwrAioExtraData *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * sizeof(PgwrAioExtraData));
}
}
@ -1643,7 +1646,8 @@ void crps_destory_ctxs()
static void incre_ckpt_aio_callback(struct io_event *event)
{
BufferDesc *buf_desc = (BufferDesc *)(event->data);
PgwrAioExtraData *tempAioExtra = (PgwrAioExtraData *)(event->data);
BufferDesc *buf_desc = (BufferDesc *)(tempAioExtra->aio_bufdesc);
uint32 written_size = event->obj->u.c.nbytes;
if (written_size != event->res) {
ereport(WARNING, (errmsg("aio write failed (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", -(int32)(event->res),
@ -1687,6 +1691,21 @@ static void incre_ckpt_aio_callback(struct io_event *event)
pfree(origin_buf);
#endif
off_t roffset = 0;
if (IsSegmentBufferID(buf_desc->buf_id)) {
roffset = ((buf_desc->tag.blockNum) % RELSEG_SIZE) * BLCKSZ;
} else {
roffset = ((buf_desc->extra->seg_blockno) % RELSEG_SIZE) * BLCKSZ;
}
int aioRet = dss_aio_post_pwrite(event->obj->data, tempAioExtra->aio_fd, event->obj->u.c.nbytes, roffset);
if (aioRet != 0) {
ereport(PANIC, (errmsg("failed to post write by asnyc io (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", errno,
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
(int32)buf_desc->tag.rnode.bucketNode, (int32)buf_desc->tag.rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
}
buf_desc->extra->aio_in_progress = false;
UnpinBuffer(buf_desc, true);
}

View File

@ -5147,8 +5147,10 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo
securec_check(ret, "\0", "\0");
struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt);
PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]);
int32 io_ret = seg_physical_aio_prep_pwrite(spc, fakenode, bufferinfo.blockinfo.forknum,
bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr);
bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra);
tempAioExtra->aio_bufdesc = (void *)bufdesc;
if (io_ret != DSS_SUCCESS) {
ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u",
fakenode.spcNode, fakenode.dbNode, fakenode.relNode, (int)fakenode.bucketNode,
@ -5164,7 +5166,7 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo
t_thrd.dms_cxt.buf_in_aio = true;
bufdesc->extra->aio_in_progress = true;
/* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */
iocb_ptr->data = (void *)bufdesc;
iocb_ptr->data = (void *)tempAioExtra;
DSSAioAppendIOCB(aio_cxt, iocb_ptr);
} else {
seg_physical_write(spc, fakenode, bufferinfo.blockinfo.forknum, bufdesc->extra->seg_blockno, bufToWrite,

View File

@ -121,8 +121,7 @@ int dss_device_init(const char *conn_path, bool enable_dss)
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_stat", (void **)&device_op.dss_stat));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_lstat", (void **)&device_op.dss_lstat));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_fstat", (void **)&device_op.dss_fstat));
SS_RETURN_IFERR(
dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_svr_path", (void **)&device_op.dss_set_svr_path));
SS_RETURN_IFERR(
dss_load_symbol(device_op.handle, "dss_register_log_callback", (void **)&device_op.dss_register_log_callback));
@ -131,6 +130,7 @@ int dss_device_init(const char *conn_path, bool enable_dss)
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_compare_size_equal", (void **)&device_op.dss_compare_size));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pwrite", (void **)&device_op.dss_aio_pwrite));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pread", (void **)&device_op.dss_aio_pread));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_post_pwrite", (void **)&device_op.dss_aio_post_pwrite));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_init_logger", (void **)&device_op.dss_init_logger));
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_refresh_logger", (void **)&device_op.dss_refresh_logger));

View File

@ -721,3 +721,8 @@ int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long of
{
return g_dss_device_op.dss_aio_pread(iocb, fd, buf, count, offset);
}
int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset)
{
return g_dss_device_op.dss_aio_post_pwrite(iocb, fd, count, offset);
}

View File

@ -503,7 +503,11 @@ void df_extend_internal(SegLogicFile *sf)
df_extend_file_vector(sf);
}
int new_fd;
new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE);
if (ENABLE_DSS) {
new_fd = dv_open_file(filename, O_RDWR | O_CREAT | DSS_FT_NODE_FLAG_INNER_INITED, SEGMENT_FILE_MODE);
} else {
new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE);
}
if (new_fd < 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("[segpage] could not create file \"%s\": %m", filename)));
}

View File

@ -420,8 +420,10 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln)
securec_check(ret, "\0", "\0");
struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt);
PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]);
int32 io_ret = seg_physical_aio_prep_pwrite(spc, buf->tag.rnode, buf->tag.forkNum,
buf->tag.blockNum, tempBuf, (void *)iocb_ptr);
buf->tag.blockNum, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra);
tempAioExtra->aio_bufdesc = (void *)buf;
if (io_ret != DSS_SUCCESS) {
ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u",
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, (int)buf->tag.rnode.bucketNode,
@ -437,7 +439,7 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln)
buf->extra->aio_in_progress = true;
t_thrd.dms_cxt.buf_in_aio = true;
/* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */
iocb_ptr->data = (void *)buf;
iocb_ptr->data = (void *)tempAioExtra;
DSSAioAppendIOCB(aio_cxt, iocb_ptr);
} else {
seg_physical_write(spc, buf->tag.rnode, buf->tag.forkNum, buf->tag.blockNum, (char *)buf_to_write, false);

View File

@ -40,8 +40,8 @@
#include "utils/relfilenodemap.h"
#include "pgxc/execRemote.h"
#include "ddes/dms/ss_transaction.h"
#include "ddes/dms/ss_aio.h"
#include "storage/file/fio_device.h"
#include "libaio.h"
static void SSInitSegLogicFile(SegSpace *spc);
@ -108,7 +108,7 @@ void spc_write_block(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, con
}
int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr)
const char *buffer, void *iocb_ptr, void *tempAioExtra)
{
int egid = EXTENT_TYPE_TO_GROUPID(relNode.relNode);
SegExtentGroup *seg = &spc->extent_group[egid][forknum];
@ -120,6 +120,7 @@ int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum
SegPhysicalFile spf = df_get_physical_file(seg->segfile, sliceno, blocknum);
int32 ret;
if (is_dss_fd(spf.fd)) {
((PgwrAioExtraData *)tempAioExtra)->aio_fd = spf.fd;
ret = dss_aio_prep_pwrite(iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset);
} else {
io_prep_pwrite((struct iocb *)iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset);

View File

@ -1922,12 +1922,12 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B
}
int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr)
const char *buffer, void *iocb_ptr, void *tempAioExtra)
{
SegmentCheck(IsSegmentPhysicalRelNode(rNode));
SegmentCheck(spc != NULL);
return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr);
return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr, tempAioExtra);
}
static bool check_meta_data(BlockNumber extent, uint32 extent_size, uint32* offset_block)
{

View File

@ -49,6 +49,11 @@ typedef struct DSSAioCxt {
AioUtil aio[DSS_AIO_UTIL_NUM];
} DSSAioCxt;
typedef struct PgwrAioExtraData {
int aio_fd;
void* aio_bufdesc;
} PgwrAioExtraData;
void DSSAioInitialize(DSSAioCxt *aio_cxt, aio_callback callback);
void DSSAioDestroy(DSSAioCxt *aio_cxt);
struct iocb* DSSAioGetIOCB(DSSAioCxt *aio_cxt);

View File

@ -87,6 +87,7 @@ typedef struct PageWriterProc {
/* auxiluary structs for implementing AIO in DSS */
DSSAioCxt aio_cxt;
char *aio_buf;
PgwrAioExtraData* aio_extra;
} PageWriterProc;
typedef struct PageWriterProcs {

View File

@ -69,7 +69,9 @@ typedef int (*dss_get_storage_addr)(int handle, long long offset, char *poolname
typedef int (*dss_compare_size_equal)(const char *vg_name, long long *au_size);
typedef int (*dss_aio_prep_pwrite_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
typedef int (*dss_aio_prep_pread_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count, unsigned long long log_max_file_size);
typedef int (*dss_aio_post_pwrite_device)(void *iocb, int handle, size_t count, long long offset);
typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count,
unsigned long long log_max_file_size);
typedef void (*dss_refresh_logger_t)(char *log_field, unsigned long long *value);
typedef int (*dss_set_main)(void);
typedef struct st_dss_device_op_t {
@ -109,6 +111,7 @@ typedef struct st_dss_device_op_t {
dss_compare_size_equal dss_compare_size;
dss_aio_prep_pwrite_device dss_aio_pwrite;
dss_aio_prep_pread_device dss_aio_pread;
dss_aio_post_pwrite_device dss_aio_post_pwrite;
dss_init_logger_t dss_init_logger;
dss_refresh_logger_t dss_refresh_logger;
dss_set_main dss_set_main_inst;

View File

@ -80,5 +80,5 @@ int dss_get_addr(int handle, long long offset, char *poolname, char *imagename,
int dss_compare_size(const char *vg_name, long long *au_size);
int dss_aio_prep_pwrite(void *iocb, int fd, void *buf, size_t count, long long offset);
int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long offset);
int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset);
#endif // FIO_DSS_H

View File

@ -60,7 +60,7 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B
XLogRecPtr seg_get_headlsn(SegSpace *spc, BlockNumber blockNum, bool isbucket);
int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr);
const char *buffer, void *iocb_ptr, void *tempAioExtra);
/* segment sync callback */
void forget_space_fsync_request(SegSpace *spc);

View File

@ -416,7 +416,7 @@ void spc_datafile_create(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknu
void spc_extend_file(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum, BlockNumber blkno);
bool spc_datafile_exist(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum);
int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr);
const char *buffer, void *iocb_ptr, void *tempAioExtra);
extern void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo);