diff --git a/src/gausskernel/ddes/adapter/ss_dms.cpp b/src/gausskernel/ddes/adapter/ss_dms.cpp index 6fe490817..76d107e2d 100644 --- a/src/gausskernel/ddes/adapter/ss_dms.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms.cpp @@ -206,10 +206,9 @@ int dms_request_page(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, dms_lock_mode return g_ss_dms_func.dms_request_page(dms_ctx, ctrl, mode); } -int dms_broadcast_msg(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg, - unsigned int timeout) +int dms_broadcast_msg(dms_context_t *dms_ctx, dms_broadcast_info_t *dms_broad_info) { - return g_ss_dms_func.dms_broadcast_msg(dms_ctx, data, len, handle_recv_msg, timeout); + return g_ss_dms_func.dms_broadcast_msg(dms_ctx, dms_broad_info); } int dms_request_opengauss_update_xid(dms_context_t *dms_ctx, unsigned short t_infomask, unsigned short t_infomask2, diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 056071bd8..4f3a79dcd 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -64,6 +64,7 @@ void InitDmsBufCtrl(void) void InitDmsContext(dms_context_t *dmsContext) { + (void)memset_s(dmsContext, sizeof(dms_context_t), 0, sizeof(dms_context_t)); /* Proc threads id range: [0, TotalProcs - 1]. Non-proc threads id range: [TotalProcs + 1, TotalProcs + 4] */ uint32 TotalProcs = (uint32)(GLOBAL_ALL_PROCS); dmsContext->inst_id = (unsigned int)SS_MY_INST_ID; @@ -87,8 +88,15 @@ void InitDmsBufContext(dms_context_t* dmsBufCxt, BufferTag buftag) void TransformLockTagToDmsLatch(dms_drlatch_t* dlatch, const LOCKTAG locktag) { - DmsInitLatch(&dlatch->drid, locktag.locktag_type, locktag.locktag_field1, locktag.locktag_field2, - locktag.locktag_field3, locktag.locktag_field4, locktag.locktag_field5); + int32 ret = memset_sp(&dlatch->drid, sizeof(dms_drid_t), 0, sizeof(dms_drid_t)); + securec_check(ret, "", ""); + + dlatch->drid.type = locktag.locktag_type; + dlatch->drid.oid = locktag.locktag_field1; + dlatch->drid.index = locktag.locktag_field2; + dlatch->drid.parent_part = locktag.locktag_field3; + dlatch->drid.part = locktag.locktag_field4; + dlatch->drid.uid = locktag.locktag_field5; } static void CalcSegDmsPhysicalLoc(BufferDesc* buf_desc, Buffer buffer, bool check_standby) diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index 67ca5a9bf..92d5afa89 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -476,9 +476,19 @@ void SSSendLatestSnapshotToStandby(TransactionId xmin, TransactionId xmax, Commi latest_snapshot.xmax = xmax; latest_snapshot.csn = csn; latest_snapshot.type = BCAST_SEND_SNAPSHOT; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&latest_snapshot, + .len = sizeof(SSBroadcastSnapshot), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_ONE_SECOND, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; do { - ret = dms_broadcast_msg(&dms_ctx, (char *)&latest_snapshot, sizeof(SSBroadcastSnapshot), - (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); if (ret == DMS_SUCCESS) { return; @@ -556,9 +566,19 @@ bool SSCheckDbBackendsFromAllStandby(Oid dbid) SSBroadcastDbBackends backends_data; backends_data.type = BCAST_CHECK_DB_BACKENDS; backends_data.dbid = dbid; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&backends_data, + .len = sizeof(SSBroadcastDbBackends), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_FIVE_SECONDS, + .handle_recv_msg = (unsigned char)true, + .check_session_kill = (unsigned char)true + }; - int ret = dms_broadcast_msg(&dms_ctx, (char *)&backends_data, sizeof(SSBroadcastDbBackends), - (unsigned char)true, SS_BROADCAST_WAIT_FIVE_SECONDS); + int ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); if (ret != DMS_NO_RUNNING_BACKENDS) { return true; } @@ -572,9 +592,19 @@ void SSRequestAllStandbyReloadReformCtrlPage() int ret; SSBroadcastCmdOnly ssmsg; ssmsg.type = BCAST_RELOAD_REFORM_CTRL_PAGE; + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&ssmsg, + .len = sizeof(SSBroadcastCmdOnly), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_ONE_SECOND, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; do { - ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastCmdOnly), - (unsigned char)false, SS_BROADCAST_WAIT_ONE_SECOND); + ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); if (ret == DMS_SUCCESS) { return; @@ -734,8 +764,18 @@ void SSUpdateSegDropTimeline(uint32 seg_drop_timeline) ssmsg.seg_drop_timeline = seg_drop_timeline; int output_backup = t_thrd.postgres_cxt.whereToSendOutput; t_thrd.postgres_cxt.whereToSendOutput = DestNone; - int ret = dms_broadcast_msg(&dms_ctx, (char *)&ssmsg, sizeof(SSBroadcastSegDropTL), (unsigned char)false, - SS_BROADCAST_WAIT_FIVE_SECONDS); + dms_broadcast_info_t dms_broad_info = { + .data = (char *)&ssmsg, + .len = sizeof(SSBroadcastSegDropTL), + .output = NULL, + .output_len = NULL, + .scope = DMS_BROADCAST_ONLINE_LIST, + .inst_map = 0, + .timeout = SS_BROADCAST_WAIT_FIVE_SECONDS, + .handle_recv_msg = (unsigned char)false, + .check_session_kill = (unsigned char)true + }; + int ret = dms_broadcast_msg(&dms_ctx, &dms_broad_info); if (ret != DMS_SUCCESS) { ereport(DEBUG1, (errmsg("SS broadcast seg_drop_timeline failed!"))); } diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index e0a2d4b0f..c10b14c34 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,3 +1,3 @@ -dms_commit_id=9d35e3ef0b9dbc9fe1a2b44cea8249fc57c04c36 -dss_commit_id=4fff8fd08f060ce471280527e3d24cf5598db2ec -cbb_commit_id=ddb5348ea1d45b1c5b6ed02479f5558536d1c026 \ No newline at end of file +dms_commit_id=152a5d5bcaedfb9fbf165a3b8aae668963a4b026 +dss_commit_id=eee3cd7bfe8b0f3b4a11b77a890ca2aa08ac1dc8 +cbb_commit_id=f22210b16f1edcc9c996299cdaa1097dbb3f4016 \ No newline at end of file diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index f08db3c23..1ec56f3f0 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -25,6 +25,7 @@ #define __DMS_API_H__ #include +#include #ifdef __cplusplus extern "C" { #endif @@ -33,7 +34,7 @@ extern "C" { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 135 +#define DMS_LOCAL_VERSION 146 #define DMS_SUCCESS 0 #define DMS_ERROR (-1) @@ -50,6 +51,7 @@ extern "C" { #define DMS_MAX_IP_LEN 64 #define DMS_MAX_INSTANCES 64 #define DMS_MAX_NAME_LEN 64 +#define DMS_MAX_RESOURCE_NAME_LEN 136 #define DMS_VERSION_MAX_LEN 256 #define DMS_OCK_LOG_PATH_LEN 256 @@ -59,6 +61,13 @@ extern "C" { #define DMS_MAX_XA_BASE16_GTRID_LEN (128) #define DMS_MAX_XA_BASE16_BQUAL_LEN (128) +#define DB_FI_ENTRY_BEGIN 10000 +#define DB_FI_ENTRY_COUNT 1024 +#define FI_ENTRY_END (DB_FI_ENTRY_BEGIN + DB_FI_ENTRY_COUNT) +#define MAX_FI_ENTRY_COUNT 2000 + +#define MAX_DMS_THREAD_NUM 512 + typedef enum en_dms_online_status { DMS_ONLINE_STATUS_OUT = 0, DMS_ONLINE_STATUS_JOIN = 1, @@ -95,12 +104,15 @@ typedef enum en_dms_dr_type { DMS_DR_TYPE_SEQVAL = 25, DMS_DR_TYPE_SHARED_INNODE = 26, DMS_DR_TYPE_PROC_ENTRY = 27, - DMS_DR_TYPE_PART_TABLE, + DMS_DR_TYPE_PART_TABLE = 28, + DMS_DR_TYPE_SS_LINK = 29, + DMS_DR_TYPE_TX_ALCK = 30, + DMS_DR_TYPE_SE_ALCK = 31, DMS_DR_TYPE_MAX, } dms_dr_type_t; #define DMS_DR_IS_TABLE_TYPE(type) ((type) == DMS_DR_TYPE_TABLE || (type) == DMS_DR_TYPE_PART_TABLE) - +#define DMS_DR_IS_ALOCK_TYPE(type) ((type) == DMS_DR_TYPE_TX_ALCK || (type) == DMS_DR_TYPE_SE_ALCK) // persistent distributed resource id typedef enum en_dms_persistent_id { DMS_ID_DATABASE_CTRL = 0, @@ -117,10 +129,10 @@ typedef enum en_dms_persistent_id { // for smon deadlock check #define DMS_SMON_DLOCK_MSG_MAX_LEN 24 -#define DMS_SMON_TLOCK_MSG_MAX_LEN 24 -#define DMS_SMON_ILOCK_MSG_MAX_LEN 60 +#define DMS_SMON_TLOCK_MSG_MAX_LEN 56 #define DMS_SMON_MAX_SQL_LEN 10240 // The maximum size of a message to be transferred in the MES is 32 KB. -#define MAX_TABLE_LOCK_NUM 2048 +#define MAX_TABLE_LOCK_NUM 512 +#define DMS_MAX_W_MARKS_NUM (16320 * 64) typedef enum en_dms_smon_req_type { DMS_SMON_REQ_SID_BY_RMID = 0, @@ -128,23 +140,13 @@ typedef enum en_dms_smon_req_type { DMS_SMON_REQ_ROWID_BY_RMID = 2, }dms_smon_req_type_t; -typedef enum en_dms_smon_req_tlock_type { - DMS_SMON_REQ_TABLE_LOCK_SHARED_MSG = 0, - DMS_SMON_REQ_TABLE_LOCK_EXCLU_MSG = 1, - DMS_SMON_REQ_TABLE_LOCK_ALL_MSG = 2, -}dms_smon_req_tlock_type_t; - typedef enum en_dms_smon_req_rm_type { - DMS_SMON_REQ_TABLE_LOCK_RM = 0, - DMS_SMON_REQ_TABLE_LOCK_WAIT_RM = 1, + DMS_SMON_REQ_TLOCK_RM = 0, + DMS_SMON_REQ_TLOCK_WAIT_RM = 1, }dms_smon_req_rm_type_t; -typedef enum en_dms_smon_check_tlock_type { - DMS_SMON_CHECK_WAIT_EVENT_STATUS_BY_SID = 0, - DMS_SMON_CHECK_WAIT_TABLE_STATUS_BY_TID = 1, -}dms_smon_check_tlock_type_t; - /* distributed resource id definition */ +#define DMS_DRID_CTX_SIZE 128 #pragma pack(4) typedef struct st_dms_drid { union { @@ -155,7 +157,10 @@ typedef struct st_dms_drid { }; struct { unsigned short type; // lock type - unsigned short uid; // user id, for table lock resource + union { + unsigned short uid; // user id, for table lock resource + unsigned short len; + }; union { struct { unsigned int oid; // lock id @@ -167,6 +172,9 @@ typedef struct st_dms_drid { unsigned long long oid_64; unsigned long long unused; }; + struct { + unsigned char resid[DMS_DRID_CTX_SIZE]; + }; }; }; }; @@ -254,7 +262,7 @@ typedef struct st_dms_cr_assist_t { dms_cr_status_t status; /* OUT parameter */ } dms_cr_assist_t; -#define DMS_RESID_SIZE 32 +#define DMS_RESID_SIZE 132 #define DMS_DRID_SIZE sizeof(dms_drid_t) typedef struct st_dms_drlock { @@ -328,8 +336,12 @@ typedef struct st_dms_context { unsigned char edp_inst; drc_global_xid_t global_xid; }; + void *stat; + void *stat_instance; + unsigned long long wait_usecs; unsigned char intercept_type; unsigned char curr_mode; // used for table lock + unsigned long long max_wait_rsp_time; // unit ms. under some circumstances, dms need get rsp quickly for timeout. } dms_context_t; typedef struct st_dms_cr { @@ -389,6 +401,7 @@ typedef struct st_dms_txn_info { unsigned char is_owscn; unsigned char status; unsigned char unused[2]; + unsigned long long xid; } dms_txn_info_t; typedef struct st_dms_txn_snapshot { @@ -425,10 +438,14 @@ typedef struct st_dms_buf_ctrl { volatile unsigned char need_flush; // for recovery, owner is abort, copy instance should flush before release volatile unsigned char been_loaded; // first alloc ctrl:FALSE, after successfully loaded: TRUE volatile unsigned char in_rcy; // if drc lost, we can rebuild in_recovery flag according buf_ctrl - volatile unsigned char unused; + unsigned char release_conflict : 1; + unsigned char unused : 7; unsigned long long edp_scn; // set when become edp, lastest scn when page becomes edp unsigned long long edp_map; // records edp instance long long last_ckpt_time; // last time when local edp page is added to group. + volatile unsigned char is_reform_visit; + unsigned unused_array[3]; + volatile unsigned int lock_ss_read; // concurrency control for rebuild/confirm #ifdef OPENGAUSS int buf_id; unsigned int state; @@ -437,7 +454,7 @@ typedef struct st_dms_buf_ctrl { unsigned long long pblk_lsn; unsigned char seg_fileno; unsigned int seg_blockno; - void* ctrl_lock; + void *ctrl_lock; #endif } dms_buf_ctrl_t; @@ -568,10 +585,13 @@ typedef enum en_dms_wait_event { DMS_EVT_DLS_REQ_TABLE, DMS_EVT_DLS_REQ_PART_X, DMS_EVT_DLS_REQ_PART_S, + DMS_EVT_DLS_REQ_ALOCK_X, + DMS_EVT_DLS_REQ_ALOCK_S, DMS_EVT_DLS_WAIT_TXN, DMS_EVT_DEAD_LOCK_TXN, DMS_EVT_DEAD_LOCK_TABLE, DMS_EVT_DEAD_LOCK_ITL, + DMS_EVT_DEAD_LOCK_ALCK, DMS_EVT_BROADCAST_BTREE_SPLIT, DMS_EVT_BROADCAST_ROOT_PAGE, DMS_EVT_QUERY_OWNER_ID, @@ -638,6 +658,13 @@ typedef enum en_dms_reform_type { DMS_REFORM_TYPE_FOR_MAINTAIN, // for start database without CM, every instance is supported // New type need to be added start from here DMS_REFORM_TYPE_FOR_RST_RECOVER, + DMS_REFORM_TYPE_FOR_NEW_JOIN, + DMS_REFORM_TYPE_FOR_STANDBY_MAINTAIN, + DMS_REFORM_TYPE_FOR_NORMAL_STANDBY, + DMS_REFORM_TYPE_FOR_AZ_SWITCHOVER_DEMOTE, + DMS_REFORM_TYPE_FOR_AZ_SWITCHOVER_PROMOTE, + DMS_REFORM_TYPE_FOR_AZ_FAILOVER, + DMS_REFORM_TYPE_COUNT } dms_reform_type_t; @@ -686,6 +713,7 @@ typedef struct st_stat_buf_info { typedef enum en_broadcast_scope { DMS_BROADCAST_OLDIN_LIST = 0, // default value DMS_BROADCAST_ONLINE_LIST = 1, + DMS_BROADCAST_SPECIFY_LIST = 2, DMS_BROADCAST_TYPE_COUNT, } dms_broadcast_scope_e; @@ -695,7 +723,7 @@ typedef enum en_broadcast_scope { typedef struct st_dv_drc_buf_info { stat_buf_info_t buf_info[DMS_MAX_INSTANCES]; /* save buffer related information */ dms_context_t dms_ctx; - char data[DMS_MAX_NAME_LEN]; /* user defined resource(page) identifier */ + char data[DMS_MAX_RESOURCE_NAME_LEN]; /* user defined resource(page) identifier */ unsigned char master_id; unsigned long long copy_insts; /* bitmap for owners, for S mode, more than one owner may exist */ unsigned char claimed_owner; /* owner */ @@ -723,6 +751,26 @@ typedef struct st_dms_reform_start_context { unsigned long long bitmap_reconnect; } dms_reform_start_context_t; +typedef enum en_dms_db_role { + DMS_DB_ROLE_PRIMARY = 0, + DMS_DB_ROLE_PHYSICAL_STANDBY = 1, + DMS_DB_ROLE_CASCADED_PHYSICAL_STANDBY = 2, +} dms_db_role_t; + +typedef struct st_dms_broadcast_info { + char *data; + unsigned int len; + char *output; + unsigned int *output_len; + dms_broadcast_scope_e scope; + unsigned long long inst_map; /* when scope is DMS_BROADCAST_SPECIFY_LIST, inst_map is used */ + unsigned int timeout; + unsigned char handle_recv_msg; + unsigned char check_session_kill; +} dms_broadcast_info_t; + +typedef struct dms_fi_entry dms_fi_entry; +typedef int(*dms_fi_callback_func)(const dms_fi_entry *entry, va_list args); typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id); typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id, unsigned long long list_in, unsigned int save_ctrl); @@ -761,6 +809,7 @@ typedef int(*dms_set_buf_load_status)(dms_buf_ctrl_t *buf_ctrl, dms_buf_load_sta typedef int(*dms_remove_buf_load_status)(dms_buf_ctrl_t *buf_ctrl, dms_buf_load_status_t dms_buf_load_status); typedef void(*dms_update_global_lsn)(void *db_handle, unsigned long long lamport_lsn); typedef void(*dms_update_global_scn)(void *db_handle, unsigned long long lamport_scn); +typedef void(*dms_update_node_lfn)(void *db_handle, unsigned long long lfn, char node_id); typedef void(*dms_update_page_lfn)(dms_buf_ctrl_t *buf_ctrl, unsigned long long lastest_lfn); typedef unsigned long long (*dms_get_page_lfn)(dms_buf_ctrl_t *buf_ctrl); typedef unsigned long long(*dms_get_global_lfn)(void *db_handle); @@ -838,6 +887,8 @@ typedef void (*dms_reset_user)(void *db_handle, unsigned long long list_in); typedef int (*dms_drc_xa_res_rebuild)(void *db_handle, unsigned char thread_index, unsigned char parall_num); typedef void (*dms_reform_shrink_xa_rms)(void *db_handle, unsigned char undo_seg_id); typedef void (*dms_ckpt_unblock_rcy_local)(void *db_handle, unsigned long long list_in); +typedef int (*dms_drc_rebuild_parallel)(void *db_handle, unsigned char thread_index, unsigned char thread_num); +typedef int (*dms_drc_validate_parallel)(void *db_handle, unsigned char thread_index, unsigned char thread_num); // for openGauss typedef void (*dms_thread_init_t)(unsigned char need_startup, char **reg_data); @@ -855,12 +906,10 @@ typedef void (*dms_get_txn_dlock_by_rmid)(void *db_handle, unsigned short rmid, typedef void (*dms_get_rowid_by_rmid)(void *db_handle, unsigned short rmid, char rowid[DMS_ROWID_SIZE]); typedef void (*dms_get_sql_from_session)(void *db_handle, unsigned short sid, char *sql_str, unsigned int sql_str_len); typedef int (*dms_get_itl_lock_by_xid)(void *db_handle, char xid[DMS_XID_SIZE], char *ilock, unsigned int ilock_len); -typedef void (*dms_check_tlock_status)(void *db_handle, unsigned int type, unsigned short sid, - unsigned long long tableid, unsigned int *in_use); -typedef void (*dms_get_tlock_msg_by_tid)(void *db_handle, unsigned long long table_id, unsigned int type, char *rsp, - unsigned int rsp_len, unsigned int *tlock_cnt); -typedef void (*dms_get_tlock_msg_by_rm)(void *db_handle, unsigned short sid, unsigned short rmid, int type, char *tlock, +typedef void (*dms_get_tlock_by_rm)(void *db_handle, unsigned short sid, unsigned short rmid, int type, char *tlock, unsigned int tlock_len); +typedef int (*dms_get_tlock_by_tid)(void *db_handle, char *tlock, char *out_msg); +typedef void (*dms_get_tlock_by_tid_ack)(char *data, char *stack, char *w_marks, unsigned int *cnt); typedef int (*dms_switchover_demote)(void *db_handle); typedef int (*dms_switchover_promote)(void *db_handle); @@ -893,7 +942,28 @@ typedef void (*dms_free_prot_proc)(void *ptr); typedef int (*dms_get_kernel_error_code)(); typedef int (*dms_lsn_validate)(void *db_handle, char *pageid, unsigned long long lsn, unsigned char in_recovery); typedef int (*dms_invld_tlock_ownership)(void *db_handle, char *resid, unsigned char req_mode, unsigned char is_try); -typedef int (*dms_get_tlock_mode)(void *db_handle, char *resid); +typedef unsigned short (*dms_get_tlock_mode)(void *db_handle, char *resid); +typedef void (*dms_set_current_point)(void *db_handle); + +typedef void (*dms_get_db_role)(void *db_handle, unsigned int *role); +typedef void (*dms_check_lrpl_takeover)(void *db_handle, unsigned int *need_takeover); +typedef void (*dms_reset_link)(void *db_handle); +typedef void (*dms_set_online_list)(void *db_handle, unsigned long long online_list); +typedef int (*dms_start_lrpl)(void *db_handle, int is_reformer); +typedef int (*dms_stop_lrpl)(void *db_handle, int is_reformer); +typedef int (*dms_az_switchover_demote_phase1)(void *db_handle); +typedef int (*dms_az_switchover_demote_approve)(void *db_handle); +typedef int (*dms_az_switchover_demote_phase2)(void *db_handle); +typedef int (*dms_az_switchover_promote_core)(void *db_handle); +typedef void (*dms_dyn_log)(void *db_handle, long long dyn_log_time); + +typedef int (*dms_invld_alock_ownership)(void *db_handle, char *resid, unsigned char req_mode, unsigned char is_try); +typedef unsigned short (*dms_get_alock_mode)(void *db_handle, char *resid); +typedef int (*dms_get_alock_wait_info)(void *db_handle, char *resid, char *info_buf, unsigned int buf_len, + unsigned int *info_len); +typedef int (*dms_az_failover_promote_phase1)(void *db_handle); +typedef int (*dms_az_failover_promote_resetlog)(void *db_handle); +typedef int (*dms_az_failover_promote_phase2)(void *db_handle); typedef struct st_dms_callback { // used in reform @@ -935,6 +1005,8 @@ typedef struct st_dms_callback { dms_reform_shrink_xa_rms dms_shrink_xa_rms; dms_ckpt_unblock_rcy_local ckpt_unblock_rcy_local; + dms_drc_rebuild_parallel rebuild_alock_parallel; + dms_drc_validate_parallel validate_alock_parallel; // used in reform for opengauss dms_thread_init_t dms_thread_init; dms_thread_deinit_t dms_thread_deinit; @@ -1015,9 +1087,9 @@ typedef struct st_dms_callback { dms_get_rowid_by_rmid get_rowid_by_rmid; dms_get_sql_from_session get_sql_from_session; dms_get_itl_lock_by_xid get_itl_lock_by_xid; - dms_check_tlock_status check_tlock_status; - dms_get_tlock_msg_by_tid get_tlock_by_tid; - dms_get_tlock_msg_by_rm get_tlock_by_rm; + dms_get_tlock_by_rm get_tlock_by_rm; + dms_get_tlock_by_tid get_tlock_by_tid; + dms_get_tlock_by_tid_ack get_tlock_by_tid_ack; // for switchover dms_switchover_demote switchover_demote; @@ -1057,7 +1129,30 @@ typedef struct st_dms_callback { dms_get_kernel_error_code db_get_kernel_error_code; dms_lsn_validate lsn_validate; dms_invld_tlock_ownership invld_tlock_ownership; + dms_invld_alock_ownership invld_alock_ownership; + dms_get_alock_mode get_alock_mode; dms_get_tlock_mode get_tlock_mode; + dms_set_current_point set_current_point; + dms_update_node_lfn update_node_lfn; + + dms_get_db_role get_db_role; + dms_check_lrpl_takeover check_lrpl_takeover; + dms_reset_link reset_link; + dms_set_online_list set_online_list; + dms_start_lrpl start_lrpl; + dms_stop_lrpl stop_lrpl; + + // for az switchover and az failover + dms_az_switchover_demote_phase1 az_switchover_demote_phase1; + dms_az_switchover_demote_approve az_switchover_demote_approve; + dms_az_switchover_demote_phase2 az_switchover_demote_phase2; + dms_az_switchover_promote_core az_switchover_promote; + dms_az_failover_promote_phase1 az_failover_promote_phase1; + dms_az_failover_promote_resetlog az_failover_promote_resetlog; + dms_az_failover_promote_phase2 az_failover_promote_phase2; + + dms_dyn_log dyn_log; + dms_get_alock_wait_info get_alock_wait_info; } dms_callback_t; typedef struct st_dms_instance_net_addr { @@ -1069,6 +1164,12 @@ typedef struct st_dms_instance_net_addr { unsigned char reserved[1]; } dms_instance_net_addr_t; +typedef struct dms_fi_config { + unsigned int entries[MAX_FI_ENTRY_COUNT]; + unsigned int count; + unsigned int fault_value; +} dms_fi_config_t; + typedef struct st_dms_profile { unsigned int inst_id; unsigned long long inst_map; @@ -1161,8 +1262,6 @@ typedef enum en_reform_callback_stat { REFORM_CALLBACK_STAT_CKPT_LATCH = 0, REFORM_CALLBACK_STAT_BUCKET_LOCK, REFORM_CALLBACK_STAT_SS_READ_LOCK, - REFORM_CALLBACK_STAT_ENTRY_TLOCK, - REFORM_CALLBACK_STAT_PART_ENTRY_TLOCK, REFORM_CALLBACK_STAT_REBUILD_TLOCK_REMOTE, REFORM_CALLBACK_STAT_GET_DISK_LSN, REFORM_CALLBACK_STAT_DRC_EXIST, @@ -1180,26 +1279,55 @@ typedef enum en_reform_callback_stat { REFORM_CALLBACK_STAT_VALIDATE_DRC_PAGE_BUCKET_LOCK, REFORM_CALLBACK_STAT_VALIDATE_DRC_PAGE_SS_READ_LOCK, REFORM_CALLBACK_STAT_VALIDATE_DRC_PAGE_REMOTE, - REFORM_CALLBACK_STAT_VALIDATE_DRC_ENTRY_TLOCK, - REFORM_CALLBACK_STAT_VALIDATE_DRC_PART_ENTRY_TLOCK, REFORM_CALLBACK_STAT_VALIDATE_DRC_TLOCK_REMOTE, REFORM_MES_TASK_STAT_VALIDATE_LSN_GET_CTRL, REFORM_MES_TASK_STAT_VALIDATE_LSN_GET_CTRL_TIMEOUT, REFORM_MES_TASK_STAT_VALIDATE_LSN_GET_DISK_LSN, REFORM_MES_TASK_STAT_VALIDATE_LSN_BUF_UNLATCH, - + REFORM_CALLBACK_STAT_REBUILD_ALOCK_LOCAL, + REFORM_CALLBACK_STAT_REBUILD_DRC_ALOCK_REMOTE, + REFORM_CALLBACK_STAT_VALIDATE_ALOCK_LOCAL, + REFORM_CALLBACK_STAT_VALIDATE_DRC_ALOCK_REMOTE, REFORM_CALLBACK_STAT_COUNT } reform_callback_stat_e; +typedef enum e_dms_fi_type { + DMS_FI_TYPE_BEGIN = 0, + DMS_FI_TYPE_PACKET_LOSS = DMS_FI_TYPE_BEGIN, + DMS_FI_TYPE_NET_LATENCY, + DMS_FI_TYPE_CPU_LATENCY, + DMS_FI_TYPE_PROCESS_FAULT, + DMS_FI_TYPE_CUSTOM_FAULT, + DMS_FI_TYPE_END, +} dms_fi_type_e; + +struct dms_fi_entry { + int pointId; + unsigned int faultFlags; + int calledCount; + dms_fi_callback_func func; +}; + typedef struct st_dms_tlock_info { dms_drid_t resid; unsigned char lock_mode; unsigned char unused[3]; } dms_tlock_info_t; +typedef dms_tlock_info_t dms_alock_info_t; + +typedef struct thread_info { + char thread_name[DMS_MAX_NAME_LEN]; + void *thread_info; +} thread_info_t; + +typedef struct thread_set { + thread_info_t threads[MAX_DMS_THREAD_NUM]; + int thread_count; +} thread_set_t; + #ifdef __cplusplus } #endif #endif /* __DMS_H__ */ - diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 40c581d15..ebf6a8cc1 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -138,10 +138,10 @@ #define BUF_ONDEMAND_REDO_DONE 0x1000 #define SS_BROADCAST_FAILED_RETRYCOUNTS 4 -#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF) -#define SS_BROADCAST_WAIT_FIVE_SECONDS (5000) -#define SS_BROADCAST_WAIT_ONE_SECOND (1000) -#define SS_BROADCAST_WAIT_FIVE_MICROSECONDS (5) +#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFFU) +#define SS_BROADCAST_WAIT_FIVE_SECONDS (5000U) +#define SS_BROADCAST_WAIT_ONE_SECOND (1000U) +#define SS_BROADCAST_WAIT_FIVE_MICROSECONDS (5U) #define SS_ACQUIRE_LOCK_DO_NOT_WAIT 0 #define SS_ACQUIRE_LOCK_RETRY_INTERVAL (50) // 50ms diff --git a/src/include/ddes/dms/ss_dms.h b/src/include/ddes/dms/ss_dms.h index 5f4302b3e..1f7be0dd4 100644 --- a/src/include/ddes/dms/ss_dms.h +++ b/src/include/ddes/dms/ss_dms.h @@ -43,8 +43,7 @@ typedef struct st_ss_dms_func { void (*dms_get_error)(int *errcode, const char **errmsg); void (*dms_uninit)(void); int (*dms_request_page)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, dms_lock_mode_t mode); - int (*dms_broadcast_msg)(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg, - unsigned int timeout); + int (*dms_broadcast_msg)(dms_context_t *dms_ctx, dms_broadcast_info_t *dms_broad_info); int (*dms_request_opengauss_update_xid)(dms_context_t *dms_ctx, unsigned short t_infomask, unsigned short t_infomask2, unsigned long long *uxid); int (*dms_request_opengauss_xid_csn)(dms_context_t *dms_ctx, dms_opengauss_xid_csn_t *dms_txn_info, @@ -101,8 +100,7 @@ void dms_refresh_logger(char *log_field, unsigned long long *value); void dms_get_error(int *errcode, const char **errmsg); void dms_uninit(void); int dms_request_page(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, dms_lock_mode_t mode); -int dms_broadcast_msg(dms_context_t *dms_ctx, char *data, unsigned int len, unsigned char handle_recv_msg, - unsigned int timeout); +int dms_broadcast_msg(dms_context_t *dms_ctx, dms_broadcast_info_t *dms_broad_info); int dms_request_opengauss_update_xid(dms_context_t *dms_ctx, unsigned short t_infomask, unsigned short t_infomask2, unsigned long long *uxid); int dms_request_opengauss_xid_csn(dms_context_t *dms_ctx, dms_opengauss_xid_csn_t *dms_txn_info, diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 8c1a33ea3..269f944de 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -32,16 +32,6 @@ #define SS_BUF_MAX_WAIT_TIME (1000L * 1000 * 20) // 20s #define SS_BUF_WAIT_TIME_IN_ONDEMAND_REALTIME_BUILD (100000L) // 100ms -#define DmsInitLatch(drid, _type, _oid, _idx, _parent_part, _part, _uid) \ - do { \ - (drid)->type = _type; \ - (drid)->uid = _uid; \ - (drid)->oid = _oid; \ - (drid)->index = _idx; \ - (drid)->parent_part = _parent_part; \ - (drid)->part = _part; \ - } while (0) - typedef struct SSBroadcastDDLLock { SSBroadcastOp type; // must be first LOCKTAG locktag; diff --git a/src/test/ss/build_ss_database.sh b/src/test/ss/build_ss_database.sh index 8f2bf56e8..035c7360a 100644 --- a/src/test/ss/build_ss_database.sh +++ b/src/test/ss/build_ss_database.sh @@ -50,7 +50,6 @@ alter_dms_open() for node in $@ do echo -e "\nautovacuum=false" >> ${node}/postgresql.conf - echo -e "\nss_enable_reform = off" >> ${node}/postgresql.conf echo "${node}:" cat ${node}/postgresql.conf | grep ss_enable_dms done diff --git a/src/test/ss/build_ss_database_common.sh b/src/test/ss/build_ss_database_common.sh index c060ce0ff..e87104045 100644 --- a/src/test/ss/build_ss_database_common.sh +++ b/src/test/ss/build_ss_database_common.sh @@ -56,7 +56,6 @@ assign_hatest_parameter() { for node in $@ do - echo -e "ss_enable_reform = on" >> ${node}/postgresql.conf echo -e "log_min_messages = log" >> ${node}/postgresql.conf echo -e "logging_module = 'on(ALL)'" >> ${node}/postgresql.conf echo -e "ss_log_level = 255" >> ${node}/postgresql.conf diff --git a/src/test/ss/ss_database_build_env.sh b/src/test/ss/ss_database_build_env.sh index afe6e21fc..4ff258531 100644 --- a/src/test/ss/ss_database_build_env.sh +++ b/src/test/ss/ss_database_build_env.sh @@ -66,7 +66,6 @@ assign_dms_parameter() for node in $@ do echo -e "\nautovacuum=false" >> ${node}/postgresql.conf - echo -e "\nss_enable_reform=on" >> ${node}/postgresql.conf echo -e "\nss_enable_ssl = 0" >> ${node}/postgresql.conf echo -e "\nlog_min_messages = warning" >> ${node}/postgresql.conf echo -e "\nlogging_module ='on(ALL)'" >> ${node}/postgresql.conf