!5906 新增适配dss aio post pwrite接口
Merge pull request !5906 from cchen676/240710master
This commit is contained in:
@ -49,6 +49,7 @@ static void WaitDSSAioComplete(DSSAioCxt *aio_cxt, int index)
|
||||
for (int i = 0; i < num; i++) {
|
||||
aio_cxt->aiocb(&aio->events[i]);
|
||||
}
|
||||
|
||||
event_num -= num;
|
||||
}
|
||||
|
||||
|
||||
@ -47,6 +47,7 @@
|
||||
#include "gstrace/gstrace_infra.h"
|
||||
#include "gstrace/postmaster_gstrace.h"
|
||||
#include "ddes/dms/ss_dms_bufmgr.h"
|
||||
#include "storage/dss/fio_dss.h"
|
||||
|
||||
#include <zstd.h>
|
||||
|
||||
@ -323,6 +324,8 @@ void incre_ckpt_pagewriter_cxt_init()
|
||||
/* 2M AIO buffer */
|
||||
char *unaligned_buf = (char *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * BLCKSZ + BLCKSZ);
|
||||
pgwr->aio_buf = (char *)TYPEALIGN(BLCKSZ, unaligned_buf);
|
||||
pgwr->aio_extra =
|
||||
(PgwrAioExtraData *)palloc0(DSS_AIO_BATCH_SIZE * DSS_AIO_UTIL_NUM * sizeof(PgwrAioExtraData));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1643,7 +1646,8 @@ void crps_destory_ctxs()
|
||||
|
||||
static void incre_ckpt_aio_callback(struct io_event *event)
|
||||
{
|
||||
BufferDesc *buf_desc = (BufferDesc *)(event->data);
|
||||
PgwrAioExtraData *tempAioExtra = (PgwrAioExtraData *)(event->data);
|
||||
BufferDesc *buf_desc = (BufferDesc *)(tempAioExtra->aio_bufdesc);
|
||||
uint32 written_size = event->obj->u.c.nbytes;
|
||||
if (written_size != event->res) {
|
||||
ereport(WARNING, (errmsg("aio write failed (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", -(int32)(event->res),
|
||||
@ -1687,6 +1691,21 @@ static void incre_ckpt_aio_callback(struct io_event *event)
|
||||
pfree(origin_buf);
|
||||
#endif
|
||||
|
||||
off_t roffset = 0;
|
||||
if (IsSegmentBufferID(buf_desc->buf_id)) {
|
||||
roffset = ((buf_desc->tag.blockNum) % RELSEG_SIZE) * BLCKSZ;
|
||||
} else {
|
||||
roffset = ((buf_desc->extra->seg_blockno) % RELSEG_SIZE) * BLCKSZ;
|
||||
}
|
||||
|
||||
int aioRet = dss_aio_post_pwrite(event->obj->data, tempAioExtra->aio_fd, event->obj->u.c.nbytes, roffset);
|
||||
if (aioRet != 0) {
|
||||
ereport(PANIC, (errmsg("failed to post write by asnyc io (errno = %d), buffer: %d/%d/%d/%d/%d %d-%d", errno,
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
(int32)buf_desc->tag.rnode.bucketNode, (int32)buf_desc->tag.rnode.opt,
|
||||
buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
|
||||
}
|
||||
|
||||
buf_desc->extra->aio_in_progress = false;
|
||||
UnpinBuffer(buf_desc, true);
|
||||
}
|
||||
|
||||
@ -5147,8 +5147,10 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo
|
||||
securec_check(ret, "\0", "\0");
|
||||
|
||||
struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt);
|
||||
PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]);
|
||||
int32 io_ret = seg_physical_aio_prep_pwrite(spc, fakenode, bufferinfo.blockinfo.forknum,
|
||||
bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr);
|
||||
bufdesc->extra->seg_blockno, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra);
|
||||
tempAioExtra->aio_bufdesc = (void *)bufdesc;
|
||||
if (io_ret != DSS_SUCCESS) {
|
||||
ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u",
|
||||
fakenode.spcNode, fakenode.dbNode, fakenode.relNode, (int)fakenode.bucketNode,
|
||||
@ -5164,7 +5166,7 @@ void FlushBuffer(void *buf, SMgrRelation reln, ReadBufferMethod flushmethod, boo
|
||||
t_thrd.dms_cxt.buf_in_aio = true;
|
||||
bufdesc->extra->aio_in_progress = true;
|
||||
/* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */
|
||||
iocb_ptr->data = (void *)bufdesc;
|
||||
iocb_ptr->data = (void *)tempAioExtra;
|
||||
DSSAioAppendIOCB(aio_cxt, iocb_ptr);
|
||||
} else {
|
||||
seg_physical_write(spc, fakenode, bufferinfo.blockinfo.forknum, bufdesc->extra->seg_blockno, bufToWrite,
|
||||
|
||||
@ -121,8 +121,7 @@ int dss_device_init(const char *conn_path, bool enable_dss)
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_stat", (void **)&device_op.dss_stat));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_lstat", (void **)&device_op.dss_lstat));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_fstat", (void **)&device_op.dss_fstat));
|
||||
SS_RETURN_IFERR(
|
||||
dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_main_inst", (void **)&device_op.dss_set_main_inst));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_set_svr_path", (void **)&device_op.dss_set_svr_path));
|
||||
SS_RETURN_IFERR(
|
||||
dss_load_symbol(device_op.handle, "dss_register_log_callback", (void **)&device_op.dss_register_log_callback));
|
||||
@ -131,6 +130,7 @@ int dss_device_init(const char *conn_path, bool enable_dss)
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_compare_size_equal", (void **)&device_op.dss_compare_size));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pwrite", (void **)&device_op.dss_aio_pwrite));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_prep_pread", (void **)&device_op.dss_aio_pread));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_aio_post_pwrite", (void **)&device_op.dss_aio_post_pwrite));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_init_logger", (void **)&device_op.dss_init_logger));
|
||||
SS_RETURN_IFERR(dss_load_symbol(device_op.handle, "dss_refresh_logger", (void **)&device_op.dss_refresh_logger));
|
||||
|
||||
|
||||
@ -721,3 +721,8 @@ int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long of
|
||||
{
|
||||
return g_dss_device_op.dss_aio_pread(iocb, fd, buf, count, offset);
|
||||
}
|
||||
|
||||
int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset)
|
||||
{
|
||||
return g_dss_device_op.dss_aio_post_pwrite(iocb, fd, count, offset);
|
||||
}
|
||||
@ -503,7 +503,11 @@ void df_extend_internal(SegLogicFile *sf)
|
||||
df_extend_file_vector(sf);
|
||||
}
|
||||
int new_fd;
|
||||
new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE);
|
||||
if (ENABLE_DSS) {
|
||||
new_fd = dv_open_file(filename, O_RDWR | O_CREAT | DSS_FT_NODE_FLAG_INNER_INITED, SEGMENT_FILE_MODE);
|
||||
} else {
|
||||
new_fd = dv_open_file(filename, O_RDWR | O_CREAT, SEGMENT_FILE_MODE);
|
||||
}
|
||||
if (new_fd < 0) {
|
||||
ereport(ERROR, (errcode_for_file_access(), errmsg("[segpage] could not create file \"%s\": %m", filename)));
|
||||
}
|
||||
|
||||
@ -420,8 +420,10 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln)
|
||||
securec_check(ret, "\0", "\0");
|
||||
|
||||
struct iocb *iocb_ptr = DSSAioGetIOCB(aio_cxt);
|
||||
PgwrAioExtraData* tempAioExtra = &(pgwr->aio_extra[aiobuf_id]);
|
||||
int32 io_ret = seg_physical_aio_prep_pwrite(spc, buf->tag.rnode, buf->tag.forkNum,
|
||||
buf->tag.blockNum, tempBuf, (void *)iocb_ptr);
|
||||
buf->tag.blockNum, tempBuf, (void *)iocb_ptr, (void *)tempAioExtra);
|
||||
tempAioExtra->aio_bufdesc = (void *)buf;
|
||||
if (io_ret != DSS_SUCCESS) {
|
||||
ereport(PANIC, (errmsg("dss aio failed, buffer: %d/%d/%d/%d/%d %d-%u",
|
||||
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, (int)buf->tag.rnode.bucketNode,
|
||||
@ -437,7 +439,7 @@ void SegFlushBuffer(BufferDesc *buf, SMgrRelation reln)
|
||||
buf->extra->aio_in_progress = true;
|
||||
t_thrd.dms_cxt.buf_in_aio = true;
|
||||
/* should be after io_prep_pwrite, because io_prep_pwrite will memset iocb struct */
|
||||
iocb_ptr->data = (void *)buf;
|
||||
iocb_ptr->data = (void *)tempAioExtra;
|
||||
DSSAioAppendIOCB(aio_cxt, iocb_ptr);
|
||||
} else {
|
||||
seg_physical_write(spc, buf->tag.rnode, buf->tag.forkNum, buf->tag.blockNum, (char *)buf_to_write, false);
|
||||
|
||||
@ -40,8 +40,8 @@
|
||||
#include "utils/relfilenodemap.h"
|
||||
#include "pgxc/execRemote.h"
|
||||
#include "ddes/dms/ss_transaction.h"
|
||||
#include "ddes/dms/ss_aio.h"
|
||||
#include "storage/file/fio_device.h"
|
||||
#include "libaio.h"
|
||||
|
||||
static void SSInitSegLogicFile(SegSpace *spc);
|
||||
|
||||
@ -108,7 +108,7 @@ void spc_write_block(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, con
|
||||
}
|
||||
|
||||
int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum,
|
||||
const char *buffer, void *iocb_ptr)
|
||||
const char *buffer, void *iocb_ptr, void *tempAioExtra)
|
||||
{
|
||||
int egid = EXTENT_TYPE_TO_GROUPID(relNode.relNode);
|
||||
SegExtentGroup *seg = &spc->extent_group[egid][forknum];
|
||||
@ -120,6 +120,7 @@ int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum
|
||||
SegPhysicalFile spf = df_get_physical_file(seg->segfile, sliceno, blocknum);
|
||||
int32 ret;
|
||||
if (is_dss_fd(spf.fd)) {
|
||||
((PgwrAioExtraData *)tempAioExtra)->aio_fd = spf.fd;
|
||||
ret = dss_aio_prep_pwrite(iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset);
|
||||
} else {
|
||||
io_prep_pwrite((struct iocb *)iocb_ptr, spf.fd, (void *)buffer, BLCKSZ, roffset);
|
||||
|
||||
@ -1922,12 +1922,12 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B
|
||||
}
|
||||
|
||||
int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum,
|
||||
const char *buffer, void *iocb_ptr)
|
||||
const char *buffer, void *iocb_ptr, void *tempAioExtra)
|
||||
{
|
||||
SegmentCheck(IsSegmentPhysicalRelNode(rNode));
|
||||
SegmentCheck(spc != NULL);
|
||||
|
||||
return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr);
|
||||
return spc_aio_prep_pwrite(spc, rNode, forknum, blocknum, buffer, iocb_ptr, tempAioExtra);
|
||||
}
|
||||
static bool check_meta_data(BlockNumber extent, uint32 extent_size, uint32* offset_block)
|
||||
{
|
||||
|
||||
@ -49,6 +49,11 @@ typedef struct DSSAioCxt {
|
||||
AioUtil aio[DSS_AIO_UTIL_NUM];
|
||||
} DSSAioCxt;
|
||||
|
||||
typedef struct PgwrAioExtraData {
|
||||
int aio_fd;
|
||||
void* aio_bufdesc;
|
||||
} PgwrAioExtraData;
|
||||
|
||||
void DSSAioInitialize(DSSAioCxt *aio_cxt, aio_callback callback);
|
||||
void DSSAioDestroy(DSSAioCxt *aio_cxt);
|
||||
struct iocb* DSSAioGetIOCB(DSSAioCxt *aio_cxt);
|
||||
|
||||
@ -87,6 +87,7 @@ typedef struct PageWriterProc {
|
||||
/* auxiluary structs for implementing AIO in DSS */
|
||||
DSSAioCxt aio_cxt;
|
||||
char *aio_buf;
|
||||
PgwrAioExtraData* aio_extra;
|
||||
} PageWriterProc;
|
||||
|
||||
typedef struct PageWriterProcs {
|
||||
|
||||
@ -69,7 +69,9 @@ typedef int (*dss_get_storage_addr)(int handle, long long offset, char *poolname
|
||||
typedef int (*dss_compare_size_equal)(const char *vg_name, long long *au_size);
|
||||
typedef int (*dss_aio_prep_pwrite_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
|
||||
typedef int (*dss_aio_prep_pread_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
|
||||
typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count, unsigned long long log_max_file_size);
|
||||
typedef int (*dss_aio_post_pwrite_device)(void *iocb, int handle, size_t count, long long offset);
|
||||
typedef int (*dss_init_logger_t)(char *log_home, unsigned int log_level, unsigned int log_backup_file_count,
|
||||
unsigned long long log_max_file_size);
|
||||
typedef void (*dss_refresh_logger_t)(char *log_field, unsigned long long *value);
|
||||
typedef int (*dss_set_main)(void);
|
||||
typedef struct st_dss_device_op_t {
|
||||
@ -109,6 +111,7 @@ typedef struct st_dss_device_op_t {
|
||||
dss_compare_size_equal dss_compare_size;
|
||||
dss_aio_prep_pwrite_device dss_aio_pwrite;
|
||||
dss_aio_prep_pread_device dss_aio_pread;
|
||||
dss_aio_post_pwrite_device dss_aio_post_pwrite;
|
||||
dss_init_logger_t dss_init_logger;
|
||||
dss_refresh_logger_t dss_refresh_logger;
|
||||
dss_set_main dss_set_main_inst;
|
||||
|
||||
@ -80,5 +80,5 @@ int dss_get_addr(int handle, long long offset, char *poolname, char *imagename,
|
||||
int dss_compare_size(const char *vg_name, long long *au_size);
|
||||
int dss_aio_prep_pwrite(void *iocb, int fd, void *buf, size_t count, long long offset);
|
||||
int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long offset);
|
||||
|
||||
int dss_aio_post_pwrite(void *iocb, int fd, size_t count, long long offset);
|
||||
#endif // FIO_DSS_H
|
||||
|
||||
@ -60,7 +60,7 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B
|
||||
XLogRecPtr seg_get_headlsn(SegSpace *spc, BlockNumber blockNum, bool isbucket);
|
||||
|
||||
int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum,
|
||||
const char *buffer, void *iocb_ptr);
|
||||
const char *buffer, void *iocb_ptr, void *tempAioExtra);
|
||||
|
||||
/* segment sync callback */
|
||||
void forget_space_fsync_request(SegSpace *spc);
|
||||
|
||||
@ -416,7 +416,7 @@ void spc_datafile_create(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknu
|
||||
void spc_extend_file(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum, BlockNumber blkno);
|
||||
bool spc_datafile_exist(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum);
|
||||
int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum,
|
||||
const char *buffer, void *iocb_ptr);
|
||||
const char *buffer, void *iocb_ptr, void *tempAioExtra);
|
||||
|
||||
extern void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user