!2580 共享存储bugfix_1130

Merge pull request !2580 from 刘博文/master
This commit is contained in:
opengauss-bot
2022-12-10 03:45:08 +00:00
committed by Gitee
61 changed files with 1147 additions and 340 deletions

View File

@ -148,6 +148,10 @@ const uint16 DW_SECOND_DATA_START_IDX = DW_SECOND_BUFTAG_START_IDX + DW_SECOND_B
inline bool dw_buf_valid_dirty(uint32 buf_state)
{
if (ENABLE_DMS && ENABLE_DSS_AIO) {
return true;
}
return ((buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY));
}
@ -302,6 +306,18 @@ inline bool dw_page_writer_running()
return (dw_enabled() && pg_atomic_read_u32(&g_instance.ckpt_cxt_ctl->current_page_writer_count) > 0);
}
/**
* If enable dms and aio, the aio_in_process should be false.
*/
inline bool dw_buf_valid_aio_finished(BufferDesc *buf_desc, uint32 buf_state)
{
if (!ENABLE_DMS || !ENABLE_DSS_AIO) {
return true;
}
return ((buf_state & BM_VALID) && ((buf_state & BM_DIRTY) || buf_desc->aio_in_progress));
}
extern bool free_space_enough(int buf_id);
extern void dw_generate_single_file();

View File

@ -57,6 +57,11 @@
#define XLogSegSize XLogSegmentSize
#define XLogSegmentsPerXLogId (UINT64CONST(0x100000000) / XLogSegmentSize)
#define XLogRecordMaxSize ((uint32)0x3fffe000) /* 1 gigabyte - 8 kbyte */
#define XLogBaseSize (16ULL * 1024 * 1024)
#define XLogSegmentsNum(val) (((val) * XLogBaseSize + XLogSegSize - 1) / XLogSegSize)
#define XLogPreReadSize 67108864 // 64MB
/* Compute XLogRecPtr with segment number and offset. */
#define XLogSegNoOffsetToRecPtr(segno, offset, dest) \
@ -84,6 +89,9 @@
#define XLByteInPrevSeg(xlrp, logSegNo) ((((xlrp)-1) / XLogSegSize) == (logSegNo))
#define XLByteInPreReadBuf(xlrp, preReadStartPtr) \
(((xlrp) >= (preReadStartPtr)) && ((xlrp) < (preReadStartPtr + XLogPreReadSize)))
/* Check if an XLogRecPtr value is in a plausible range */
#define XRecOffIsValid(xlrp) ((xlrp) % XLOG_BLCKSZ >= SizeOfXLogShortPHD)
@ -103,6 +111,7 @@
#define InvalidRepOriginId 0
#define InvalidXlogPreReadStartPtr 0xFFFFFFFFFFFFFFFF
/*
* Block IDs used to distinguish different kinds of record fragments. Block
@ -300,6 +309,12 @@ struct XLogReaderState {
char* readBuf;
uint32 readLen;
char* readBufOrigin;
/* per-reading for dss */
XLogRecPtr preReadStartPtr;
char* preReadBuf;
char* preReadBufOrigin;
/* last read segment, segment offset, TLI for data currently in readBuf */
XLogSegNo readSegNo;
uint32 readOff;

View File

@ -201,6 +201,7 @@ typedef struct st_dms_cr {
unsigned long long query_scn;
unsigned int ssn;
char *page;
unsigned char *fb_mark;
} dms_cr_t;
typedef struct st_dms_opengauss_xid_csn {
@ -263,22 +264,19 @@ typedef struct st_dms_buf_ctrl {
// can be discard only after latest version in other instance is cleaned
volatile unsigned char is_edp;
volatile unsigned char force_request; // force to request page from remote
volatile unsigned char need_chk_master; // suport owner transfer page again
volatile unsigned char need_flush; // for recovery, owner is abort, copy instance should flush before release
unsigned long long edp_scn; // set when become edp, lastest scn when page becomes edp
unsigned long long edp_map; // records edp instance
long long last_ckpt_time; // last time when local edp page is added to group.
unsigned long long ver;
#ifdef OPENGAUSS
int buf_id;
unsigned int state;
unsigned int pblk_relno;
unsigned int pblk_blkno;
unsigned long long pblk_lsn;
unsigned long long lsn_on_disk;
unsigned char seg_fileno;
unsigned int seg_blockno;
#endif
}dms_buf_ctrl_t;
} dms_buf_ctrl_t;
typedef enum en_dms_page_latch_mode {
DMS_PAGE_LATCH_MODE_S = 1,
@ -431,6 +429,21 @@ typedef enum en_dms_role {
DMS_ROLE_PARTNER = 2
} dms_role_t;
typedef enum en_reform_phase {
DMS_PHASE_START = 0,
DMS_PHASE_AFTER_RECOVERY = 1,
DMS_PHASE_BEFORE_DC_INIT = 2,
DMS_PHASE_BEFORE_ROLLBACK = 3,
DMS_PHASE_END = 4,
} reform_phase_t;
typedef enum en_dms_status {
DMS_STATUS_OUT = 0,
DMS_STATUS_JOIN = 1,
DMS_STATUS_REFORM = 2,
DMS_STATUS_IN = 3
} dms_status_t; // used in database startup
#define DCS_BATCH_BUF_SIZE (1024 * 30)
#define DCS_RLS_OWNER_BATCH_SIZE (DCS_BATCH_BUF_SIZE / DMS_PAGEID_SIZE)
typedef struct st_dcs_batch_buf {
@ -445,20 +458,21 @@ typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stab
typedef int(*dms_get_dms_status)(void *db_handle);
typedef void(*dms_set_dms_status)(void *db_handle, int status);
typedef int(*dms_confirm_converting)(void *db_handle, char *pageid, unsigned char smon_chk,
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn);
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn, unsigned long long *ver);
typedef int(*dms_confirm_owner)(void *db_handle, char *pageid, unsigned char *lock_mode, unsigned char *is_edp,
unsigned long long *lsn);
typedef int(*dms_flush_copy)(void *db_handle, char *pageid);
typedef int(*dms_edp_lsn)(void *db_handle, char *pageid, unsigned long long *lsn);
typedef int(*dms_disk_lsn)(void *db_handle, char *pageid, unsigned long long *lsn);
typedef int(*dms_recovery)(void *db_handle, void *recovery_list, void *remove_list, int is_reformer);
typedef int(*dms_recovery)(void *db_handle, void *recovery_list, int is_reformer);
typedef int(*dms_opengauss_startup)(void *db_handle);
typedef int(*dms_opengauss_recovery_standby)(void *db_handle, int inst_id);
typedef int(*dms_opengauss_recovery_primary)(void *db_handle, int inst_id);
typedef void(*dms_reform_start_notify)(void *db_handle, dms_role_t role);
typedef void(*dms_reform_start_notify)(void *db_handle, dms_role_t role, unsigned char reform_type);
typedef int(*dms_undo_init)(void *db_handle, unsigned char inst_id);
typedef int(*dms_tx_area_init)(void *db_handle, unsigned char inst_id);
typedef int(*dms_tx_area_load)(void *db_handle, unsigned char inst_id);
typedef int (*dms_tx_area_load)(void *db_handle, unsigned char inst_id);
typedef int (*dms_tx_rollback_finish)(void *db_handle, unsigned char inst_id);
typedef unsigned char(*dms_recovery_in_progress)(void *db_handle);
typedef unsigned int(*dms_get_page_hash_val)(const char pageid[DMS_PAGEID_SIZE]);
typedef unsigned long long(*dms_get_page_lsn)(const dms_buf_ctrl_t *buf_ctrl);
@ -480,7 +494,7 @@ typedef unsigned char(*dms_page_is_dirty)(dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_leave_local_page)(void *db_handle, dms_buf_ctrl_t *buf_ctrl);
typedef void(*dms_get_pageid)(dms_buf_ctrl_t *buf_ctrl, char **pageid, unsigned int *size);
typedef char *(*dms_get_page)(dms_buf_ctrl_t *buf_ctrl);
typedef void (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE]);
typedef int (*dms_invalidate_page)(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned long long ver);
typedef void *(*dms_get_db_handle)(unsigned int *db_handle_index);
typedef void *(*dms_stack_push_cr_cursor)(void *db_handle);
typedef void (*dms_stack_pop_cr_cursor)(void *db_handle);
@ -492,9 +506,11 @@ typedef void(*dms_init_check_cr_cursor)(void *cr_cursor, char rowid[DMS_ROWID_SI
unsigned long long query_scn, unsigned int ssn);
typedef char *(*dms_get_wxid_from_cr_cursor)(void *cr_cursor);
typedef unsigned char(*dms_get_instid_of_xid_from_cr_cursor)(void *db_handle, void *cr_cursor);
typedef int(*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page,
unsigned char *is_empty_txn_list, unsigned char *exist_waiting_txn);
typedef int(*dms_reorganize_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page);
typedef int (*dms_get_page_invisible_txn_list)(void *db_handle, void *cr_cursor, void *cr_page,
unsigned char *is_empty_txn_list, unsigned char *exist_waiting_txn);
typedef int (*dms_reorganize_heap_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page,
unsigned char *fb_mark);
typedef int (*dms_reorganize_index_page_with_undo)(void *db_handle, void *cr_cursor, void *cr_page);
typedef int(*dms_check_heap_page_visible_with_undo_snapshot)(void *db_handle, void *cr_cursor, void *page,
unsigned char *is_found);
typedef void(*dms_set_page_force_request)(void *db_handle, char pageid[DMS_PAGEID_SIZE]);
@ -525,7 +541,7 @@ typedef int(*dms_get_txn_snapshot)(void *db_handle, unsigned int xmap, dms_txn_s
typedef int(*dms_get_opengauss_txn_snapshot)(void *db_handle, dms_opengauss_txn_snapshot_t *txn_snapshot);
typedef void (*dms_log_output)(dms_log_id_t log_type, dms_log_level_t log_level, const char *code_file_name,
unsigned int code_line_num, const char *module_name, const char *format, ...);
typedef void (*dms_log_flush)(void *db_handle, unsigned long long *lsn);
typedef int (*dms_log_flush)(void *db_handle, unsigned long long *lsn);
typedef int(*dms_process_edp)(void *db_handle, dms_edp_info_t *pages, unsigned int count);
typedef void (*dms_clean_ctrl_edp)(void *db_handle, dms_buf_ctrl_t *dms_ctrl);
typedef char *(*dms_display_pageid)(char *display_buf, unsigned int count, char *pageid);
@ -537,12 +553,12 @@ typedef void (*dms_check_if_build_complete)(void *db_handle, unsigned int *build
typedef int (*dms_db_is_primary)(void *db_handle);
typedef void (*dms_set_switchover_result)(void *db_handle, int result);
typedef void (*dms_set_db_standby)(void *db_handle);
typedef int (*dms_load_tablespace)(void *db_handle, unsigned int *has_offline);
typedef int (*dms_mount_to_recovery)(void *db_handle, unsigned int *has_offline);
typedef int(*dms_get_open_status)(void *db_handle);
// for openGauss
typedef void (*dms_thread_init_t)(unsigned char need_startup, char **reg_data);
typedef int (*dms_get_db_primary_id)(void *db_handle, unsigned int *primary_id);
typedef int (*dms_set_buf_info)(dms_buf_ctrl_t *buf_ctrl);
// for ssl
typedef int(*dms_decrypt_pwd_t)(const char *cipher, unsigned int len, char *plain, unsigned int size);
@ -564,7 +580,9 @@ typedef int (*dms_switchover_demote)(void *db_handle);
typedef int (*dms_switchover_promote)(void *db_handle);
typedef int (*dms_switchover_promote_opengauss)(void *db_handle, unsigned char origPrimaryId);
typedef int (*dms_failover_promote_opengauss)(void *db_handle);
typedef int (*dms_refresh_point)(void *db_handle);
typedef int (*dms_reform_done_notify)(void *db_handle);
typedef int (*dms_log_wait_flush)(void *db_handle, unsigned long long lsn);
typedef int (*dms_wait_ckpt)(void *db_handle);
typedef struct st_dms_callback {
// used in reform
@ -579,9 +597,11 @@ typedef struct st_dms_callback {
dms_disk_lsn disk_lsn;
dms_recovery recovery;
dms_db_is_primary db_is_primary;
dms_get_open_status get_open_status;
dms_undo_init undo_init;
dms_tx_area_init tx_area_init;
dms_tx_area_load tx_area_load;
dms_tx_rollback_finish tx_rollback_finish;
dms_recovery_in_progress recovery_in_progress;
dms_drc_buf_res_rebuild dms_reform_rebuild_buf_res;
dms_check_if_build_complete check_if_build_complete;
@ -593,7 +613,6 @@ typedef struct st_dms_callback {
dms_opengauss_recovery_standby opengauss_recovery_standby;
dms_opengauss_recovery_primary opengauss_recovery_primary;
dms_reform_start_notify reform_start_notify;
dms_set_buf_info set_buf_info;
dms_get_page_hash_val get_page_hash_val;
dms_get_page_lsn get_page_lsn;
@ -608,7 +627,6 @@ typedef struct st_dms_callback {
dms_get_page_lfn get_page_lfn;
dms_get_global_flushed_lfn get_global_flushed_lfn;
dms_read_local_page4transfer read_local_page4transfer;
dms_try_read_local_page try_read_local_page;
dms_page_is_dirty page_is_dirty;
dms_leave_local_page leave_local_page;
dms_get_pageid get_pageid;
@ -624,8 +642,8 @@ typedef struct st_dms_callback {
dms_get_instid_of_xid_from_cr_cursor get_instid_of_xid_from_cr_cursor;
dms_get_page_invisible_txn_list get_heap_invisible_txn_list;
dms_get_page_invisible_txn_list get_index_invisible_txn_list;
dms_reorganize_page_with_undo reorganize_heap_page_with_undo;
dms_reorganize_page_with_undo reorganize_index_page_with_undo;
dms_reorganize_heap_page_with_undo reorganize_heap_page_with_undo;
dms_reorganize_index_page_with_undo reorganize_index_page_with_undo;
dms_check_heap_page_visible_with_undo_snapshot check_heap_page_visible_with_udss;
dms_set_page_force_request set_page_force_request;
dms_get_entry_pageid_from_cr_cursor get_entry_pageid_from_cr_cursor;
@ -676,9 +694,11 @@ typedef struct st_dms_callback {
dms_failover_promote_opengauss failover_promote_opengauss;
dms_set_switchover_result set_switchover_result;
dms_set_db_standby set_db_standby;
dms_load_tablespace load_tablespace;
dms_mount_to_recovery mount_to_recovery;
dms_refresh_point refresh_point;
dms_reform_done_notify reform_done_notify;
dms_log_wait_flush log_wait_flush;
dms_wait_ckpt wait_ckpt;
} dms_callback_t;
typedef struct st_dms_instance_net_addr {
@ -739,7 +759,7 @@ typedef struct st_dms_profile {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 22
#define DMS_LOCAL_VERSION 35
#ifdef __cplusplus
}

View File

@ -0,0 +1,63 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* ss_aio.h
* aio interface.
*
* IDENTIFICATION
* src/include/ddes/dms/ss_aio.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef __SS_AIO_H__
#define __SS_AIO_H__
#include "libaio.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef void (*aio_callback)(struct io_event *event);
#define DSS_AIO_BATCH_SIZE 128
#define DSS_AIO_UTIL_NUM 2
typedef struct AioUtil {
io_context_t handle;
struct iocb iocbs[DSS_AIO_BATCH_SIZE];
struct iocb *iocbs_ptr[DSS_AIO_BATCH_SIZE];
struct io_event events[DSS_AIO_BATCH_SIZE];
int iocount;
} AioUtil;
typedef struct DSSAioCxt {
bool initialized;
aio_callback aiocb;
int index;
AioUtil aio[DSS_AIO_UTIL_NUM];
} DSSAioCxt;
void DSSAioInitialize(DSSAioCxt *aio_cxt, aio_callback callback);
void DSSAioDestroy(DSSAioCxt *aio_cxt);
struct iocb* DSSAioGetIOCB(DSSAioCxt *aio_cxt);
int DSSAioGetIOCBIndex(DSSAioCxt *aio_cxt);
void DSSAioAppendIOCB(DSSAioCxt *aio_cxt, struct iocb *iocb_ptr);
void DSSAioFlush(DSSAioCxt *aio_cxt);
#ifdef __cplusplus
}
#endif
#endif /* __SS_AIO_H__ */

View File

@ -97,6 +97,7 @@
#define BUF_IS_RELPERSISTENT 0x20
#define BUF_IS_RELPERSISTENT_TEMP 0x40
#define BUF_READ_MODE_ZERO_LOCK 0x80
#define BUF_DIRTY_NEED_FLUSH 0x100
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF)
@ -135,4 +136,16 @@ typedef struct SSBroadcastCmdOnly {
SSBroadcastOp type; // must be first
} SSBroadcastCmdOnly;
typedef enum SSReformType {
DMS_REFORM_TYPE_FOR_NORMAL = 0,
DMS_REFORM_TYPE_FOR_BUILD,
DMS_REFORM_TYPE_FOR_FAILOVER,
DMS_REFORM_TYPE_FOR_SWITCHOVER,
DMS_REFORM_TYPE_FOR_OPENGAUSS,
DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS,
DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS,
DMS_REFORM_TYPE_FOR_FULL_CLEAN,
DMS_REFORM_TYPE_FOR_MAINTAIN
} SSReformType;
#endif

View File

@ -72,6 +72,7 @@ typedef struct st_ss_dms_func {
bool (*dms_latch_timed_s)(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks,
unsigned char is_force);
void (*dms_unlatch)(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
void (*dms_pre_uninit)(void);
} ss_dms_func_t;
int ss_dms_func_init();
@ -109,6 +110,7 @@ int dms_reform_last_failed(void);
bool dms_latch_timed_x(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks);
bool dms_latch_timed_s(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned int wait_ticks, unsigned char is_force);
void dms_unlatch(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
void dms_pre_uninit(void);
#ifdef __cplusplus
}
#endif

View File

@ -66,6 +66,8 @@ int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock);
void SSLockReleaseAll();
void SSLockAcquireAll();
void MarkReadPblk(int buf_id, const XLogPhyBlock *pblk);
void SSCheckBufferIfNeedMarkDirty(Buffer buf);
void SSRecheckBufferPool();
void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag);
#endif

View File

@ -34,6 +34,7 @@
#define SSSKIP_REDO_REPLAY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.skip_redo_replay == true)
#define SS_BEFORE_RECOVERY (ENABLE_DMS && g_instance.dms_cxt.SSReformInfo.in_reform == true \
&& g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag == true)
#define SS_IN_FAILOVER (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_failover == true)
typedef struct st_reformer_ctrl {
uint64 list_stable; // stable instances list
@ -60,12 +61,13 @@ typedef struct ss_recovery_info {
bool reform_ready;
bool in_failover; // used to judge this is failover, this tag will combine with failover_triggered later
// in failover Scenario,before failover_triggered become true, this node knows itself will become new primary
bool in_flushcopy;
} ss_recovery_info_t;
extern bool SSRecoveryNodes();
extern int SSGetPrimaryInstId();
extern void SSSavePrimaryInstId(int id);
extern void SSReadControlFile(int id);
extern void SSReadControlFile(int id, bool updateDmsCtx = false);
extern void SSWriteReformerControlPages(void);
extern bool SSRecoveryApplyDelay(const XLogReaderState *record);
extern void SShandle_promote_signal();

View File

@ -36,6 +36,8 @@ typedef struct SSBroadcastCancelTrx {
int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path);
bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, char *buf);
XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize);
void SSGetXlogPath();
void SSSaveReformerCtrl();
void SSClearSegCache();

View File

@ -98,6 +98,7 @@ typedef struct knl_instance_attr_dss {
typedef struct knl_instance_attr_dms {
bool enable_dms;
bool enable_catalog_centralized;
bool enable_dss_aio;
int instance_id;
int recv_msg_pool_size;
char* interconnect_url;

View File

@ -112,16 +112,6 @@ enum knl_parallel_redo_state {
REDO_DONE,
};
/*
* used for dms
*/
typedef enum en_dms_status {
DMS_STATUS_OUT = 0,
DMS_STATUS_JOIN = 1,
DMS_STATUS_REFORM = 2,
DMS_STATUS_IN = 3
} dms_status_t;
/* all process level attribute which expose to user */
typedef struct knl_instance_attr {
@ -1206,7 +1196,10 @@ typedef struct knl_g_dms_context {
pg_atomic_uint32 inDmsThreShmemInitCnt; // the count of threads in DmsCallbackThreadShmemInit
pg_atomic_uint32 inProcExitCnt; // Post Main in proc_exit function
bool dmsInited;
}knl_g_dms_context;
XLogRecPtr ckptRedo;
bool resetSyscache;
bool finishedRecoverOldPrimaryDWFile;
} knl_g_dms_context;
typedef struct knl_instance_context {
knl_virtual_role role;

View File

@ -3319,6 +3319,7 @@ typedef struct knl_t_publication_context {
typedef struct knl_t_dms_context {
MemoryContext msgContext;
bool buf_in_aio;
} knl_t_dms_context;
/* thread context. */

View File

@ -466,7 +466,8 @@ extern bool CheckExecDirectPrivilege(const char* query); /* check user have priv
u_sess->misc_cxt.Mode = (mode); \
} while (0)
#define ENABLE_DSS (g_instance.attr.attr_storage.dss_attr.ss_enable_dss == true)
#define ENABLE_DSS (g_instance.attr.attr_storage.dss_attr.ss_enable_dss)
#define ENABLE_DSS_AIO (ENABLE_DSS && g_instance.attr.attr_storage.dms_attr.enable_dss_aio && !IsInitdb)
/*
* Auxiliary-process type identifiers.

View File

@ -27,6 +27,7 @@
#include "storage/buf/buf.h"
#include "storage/lock/lwlock.h"
#include "catalog/pg_control.h"
#include "ddes/dms/ss_aio.h"
#define ENABLE_INCRE_CKPT g_instance.attr.attr_storage.enableIncrementalCheckpoint
#define NEED_CONSIDER_USECOUNT u_sess->attr.attr_storage.enable_candidate_buf_usage_count
@ -82,6 +83,10 @@ typedef struct PageWriterProc {
CandidateList normal_list;
CandidateList nvm_list;
CandidateList seg_list;
/* auxiluary structs for implementing AIO in DSS */
DSSAioCxt aio_cxt;
char *aio_buf;
} PageWriterProc;
typedef struct PageWriterProcs {

View File

@ -211,6 +211,8 @@ typedef struct BufferDesc {
bool encrypt; /* enable table's level data encryption */
volatile uint64 lsn_on_disk;
volatile bool aio_in_progress; /* indicate aio is in progress */
#ifdef USE_ASSERT_CHECKING
volatile uint64 lsn_dirty;
#endif

View File

@ -68,6 +68,8 @@ typedef void (*dss_error_info)(int *errorcode, const char **errormsg);
typedef void (*dss_svr_path)(const char *conn_path);
typedef void (*dss_log_callback)(dss_log_output cb_log_output);
typedef int (*dss_version)(void);
typedef int (*dss_aio_prep_pwrite_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
typedef int (*dss_aio_prep_pread_device)(void *iocb, int handle, void *buf, size_t count, long long offset);
typedef struct st_dss_device_op_t {
void *handle;
dss_create_device dss_create;
@ -104,6 +106,8 @@ typedef struct st_dss_device_op_t {
dss_svr_path dss_set_svr_path;
dss_log_callback dss_register_log_callback;
dss_version dss_get_version;
dss_aio_prep_pwrite_device dss_aio_pwrite;
dss_aio_prep_pread_device dss_aio_pread;
} dss_device_op_t;
void dss_register_log_callback(dss_log_output cb_log_output);

View File

@ -33,7 +33,6 @@
#include "storage/dss/dss_adaptor.h"
void dss_device_register(dss_device_op_t *dss_device_op, bool enable_dss);
void dss_set_errno(int *errcode);
bool dss_exist_file(const char *file_name);
int dss_access_file(const char *file_name, int mode);
@ -80,4 +79,7 @@ int dss_chmod_file(const char* path, mode_t mode);
int dss_set_server_status_wrapper(bool is_master);
int dss_remove_dev(const char *name);
int dss_aio_prep_pwrite(void *iocb, int fd, void *buf, size_t count, long long offset);
int dss_aio_prep_pread(void *iocb, int fd, void *buf, size_t count, long long offset);
#endif // FIO_DSS_H

View File

@ -56,6 +56,9 @@ void seg_physical_write(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, B
bool skipFsync);
XLogRecPtr seg_get_headlsn(SegSpace *spc, BlockNumber blockNum, bool isbucket);
int32 seg_physical_aio_prep_pwrite(SegSpace *spc, RelFileNode &rNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr);
/* segment sync callback */
void forget_space_fsync_request(SegSpace *spc);
void seg_register_dirty_file(SegLogicFile *sf, int segno);

View File

@ -35,6 +35,7 @@
#include "storage/smgr/smgr.h"
#include "storage/file/fio_device_com.h"
#include "utils/segment_test.h"
#include "libaio.h"
const int DF_MAP_GROUP_RESERVED = 3;
const int DF_MAX_MAP_GROUP_CNT = 33;
@ -102,6 +103,7 @@ void df_create_file(SegLogicFile *sf, bool redo);
void df_shrink(SegLogicFile *sf, BlockNumber target);
void df_flush_data(SegLogicFile *sf, BlockNumber blocknum, BlockNumber nblocks);
bool df_ss_update_segfile_size(SegLogicFile *sf, BlockNumber target_block);
SegPhysicalFile df_get_physical_file(SegLogicFile *sf, int sliceno, BlockNumber target_block);
/*
* Data files status in the segment space;
@ -412,6 +414,8 @@ BlockNumber spc_size(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum);
void spc_datafile_create(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum);
void spc_extend_file(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum, BlockNumber blkno);
bool spc_datafile_exist(SegSpace *spc, BlockNumber egRelNode, ForkNumber forknum);
int32 spc_aio_prep_pwrite(SegSpace *spc, RelFileNode relNode, ForkNumber forknum, BlockNumber blocknum,
const char *buffer, void *iocb_ptr);
extern void spc_shrink_files(SegExtentGroup *seg, BlockNumber target_size, bool redo);