diff --git a/src/common/backend/utils/time/snapmgr.cpp b/src/common/backend/utils/time/snapmgr.cpp index d771423e3..6d403c73c 100644 --- a/src/common/backend/utils/time/snapmgr.cpp +++ b/src/common/backend/utils/time/snapmgr.cpp @@ -218,7 +218,7 @@ bool XidVisibleInSnapshot(TransactionId xid, Snapshot snapshot, TransactionIdSta loop: if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ - if (SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY) { + if (SSCanFetchLocalSnapshotTxnRelatedInfo()) { csn = TransactionIdGetCommitSeqNo(xid, false, true, false, snapshot); } else { csn = SSTransactionIdGetCommitSeqNo(xid, false, true, false, snapshot, sync); @@ -373,7 +373,7 @@ bool CommittedXidVisibleInSnapshot(TransactionId xid, Snapshot snapshot, Buffer loop: if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ - if (SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY) { + if (SSCanFetchLocalSnapshotTxnRelatedInfo()) { csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot); } else { csn = SSTransactionIdGetCommitSeqNo(xid, true, true, false, snapshot, NULL); diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 1bba1cacc..b0742971c 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -955,7 +955,7 @@ long SSGetBufSleepTime(int retry_times) bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode) { bool get_lock = false; - int wait_tickets = 2000; + int wait_tickets = 1000; int cur_tickets = 0; do { @@ -972,7 +972,7 @@ bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode) } while (true); if (!get_lock) { - ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:2s", + ereport(WARNING, (errcode(MOD_DMS), (errmsg("[SS lwlock] request LWLock:%p timeout, LWLockMode:%d, timeout:1s", lock, mode)))); } return get_lock; diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index 51725fb2d..a770884d8 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -80,6 +80,10 @@ void SSWakeupRecovery(void) static int CBGetUpdateXid(void *db_handle, unsigned long long xid, unsigned int t_infomask, unsigned int t_infomask2, unsigned long long *uxid) { + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { + return DMS_ERROR; + } + int result = DMS_SUCCESS; uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; @@ -172,6 +176,10 @@ static CommitSeqNo TransactionWaitCommittingCSN(dms_opengauss_xid_csn_t *xid_csn static int CBGetTxnCSN(void *db_handle, dms_opengauss_xid_csn_t *csn_req, dms_opengauss_csn_result_t *csn_res) { + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { + return DMS_ERROR; + } + int ret; uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); @@ -206,6 +214,10 @@ static int CBGetSnapshotData(void *db_handle, dms_opengauss_txn_snapshot_t *txn_ return DMS_ERROR; } + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { + return DMS_ERROR; + } + int retCode = DMS_ERROR; SnapshotData snapshot = {SNAPSHOT_MVCC}; uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; @@ -243,6 +255,10 @@ static int CBGetTxnSwinfo(void *db_handle, dms_opengauss_txn_sw_info_t *txn_swin return DMS_ERROR; } + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { + return DMS_ERROR; + } + int retCode = DMS_SUCCESS; uint32 slot = txn_swinfo->server_proc_slot; PGXACT* pgxact = &g_instance.proc_base_all_xacts[slot]; @@ -258,6 +274,10 @@ static int CBGetTxnSwinfo(void *db_handle, dms_opengauss_txn_sw_info_t *txn_swin static int CBGetTxnStatus(void *db_handle, unsigned long long xid, unsigned char type, unsigned char *result) { + if (!SSCanFetchLocalSnapshotTxnRelatedInfo()) { + return DMS_ERROR; + } + uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; PG_TRY(); { @@ -1781,16 +1801,21 @@ static void SSXminInfoPrepare() item->notify_oldest_xmin = MaxTransactionId; SpinLockRelease(&item->item_lock); } + + if (!SSPerformingStandbyScenario()) { + SpinLockAcquire(&xmin_info->snapshot_available_lock); + xmin_info->snapshot_available = false; + SpinLockRelease(&xmin_info->snapshot_available_lock); + } } xmin_info->bitmap_active_nodes = 0; } -static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char reform_type, - unsigned long long bitmap_nodes) +static void CBReformStartNotify(void *db_handle, dms_reform_start_context_t *rs_cxt) { ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo; reform_info->is_hashmap_constructed = false; - reform_info->reform_type = (dms_reform_type_t)reform_type; + reform_info->reform_type = rs_cxt->reform_type; g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL; g_instance.dms_cxt.SSRecoveryInfo.reform_ready = false; g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy = false; @@ -1799,7 +1824,7 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char if (reform_info->reform_type == DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS) { g_instance.dms_cxt.SSRecoveryInfo.in_failover = true; g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag = true; - if (role == DMS_ROLE_REFORMER) { + if (rs_cxt->role == DMS_ROLE_REFORMER) { g_instance.dms_cxt.dw_init = false; // variable set order: SharedRecoveryInProgress -> failover_ckpt_status -> dms_role volatile XLogCtlData *xlogctl = t_thrd.shemem_ptr_cxt.XLogCtl; @@ -1818,15 +1843,17 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char } INSTR_TIME_SET_CURRENT(reform_info->reform_start_time); - reform_info->bitmap_nodes = bitmap_nodes; - reform_info->dms_role = role; - reform_info->in_reform = true; + reform_info->bitmap_nodes = rs_cxt->bitmap_participated; + reform_info->bitmap_reconnect = rs_cxt->bitmap_reconnect; + reform_info->dms_role = rs_cxt->role; SSXminInfoPrepare(); + reform_info->in_reform = true; char reform_type_str[reform_type_str_len] = {0}; ReformTypeToString(reform_info->reform_type, reform_type_str); ereport(LOG, (errmodule(MOD_DMS), - errmsg("[SS reform] dms reform start, role:%d, reform type:%s", role, reform_type_str))); + errmsg("[SS reform] dms reform start, role:%d, reform type:%s, standby scenario:%d", + reform_info->dms_role, reform_type_str, SSPerformingStandbyScenario()))); if (reform_info->dms_role == DMS_ROLE_REFORMER) { while (dss_set_server_status_wrapper() != GS_SUCCESS) { pg_usleep(REFORM_WAIT_LONG); @@ -1845,7 +1872,7 @@ static void CBReformStartNotify(void *db_handle, dms_role_t role, unsigned char if (SS_STANDBY_FAILOVER) { AliveFailoverCleanBackends(); - } else { + } else if (!SSPerformingStandbyScenario()) { ReformCleanBackends(); } @@ -1879,6 +1906,7 @@ static int CBReformDoneNotify(void *db_handle) g_instance.attr.attr_storage.dms_attr.instance_id))); /* reform success indicates that reform of primary and standby all complete, then update gaussdb.state */ + g_instance.dms_cxt.dms_status = (dms_status_t)DMS_STATUS_IN; SendPostmasterSignal(PMSIGNAL_DMS_REFORM_DONE); g_instance.dms_cxt.SSClusterState = NODESTATE_NORMAL; g_instance.dms_cxt.SSReformInfo.in_reform = false; diff --git a/src/gausskernel/ddes/adapter/ss_reform_common.cpp b/src/gausskernel/ddes/adapter/ss_reform_common.cpp index 6fb242849..799431440 100644 --- a/src/gausskernel/ddes/adapter/ss_reform_common.cpp +++ b/src/gausskernel/ddes/adapter/ss_reform_common.cpp @@ -492,4 +492,17 @@ void SSDoradoUpdateHAmode() ereport(LOG, (errmsg("SSDoradoUpdateHAmode change control file cluster run mode to: %d", t_thrd.postmaster_cxt.HaShmData->current_mode))); } +} + +bool SSPerformingStandbyScenario() +{ + if (SS_IN_REFORM) { + if (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS && + ((uint64)(0x1 << SS_PRIMARY_ID) & g_instance.dms_cxt.SSReformInfo.bitmap_reconnect) == 0) { + return true; + } else if (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_FULL_CLEAN) { + return true; + } + } + return false; } \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_transaction.cpp b/src/gausskernel/ddes/adapter/ss_transaction.cpp index aac8aef5c..40a3b6dba 100644 --- a/src/gausskernel/ddes/adapter/ss_transaction.cpp +++ b/src/gausskernel/ddes/adapter/ss_transaction.cpp @@ -61,11 +61,6 @@ static Snapshot SSGetSnapshotDataFromMaster(Snapshot snapshot) if (dms_request_opengauss_txn_snapshot(&dms_ctx, &dms_snapshot) == DMS_SUCCESS) { break; } - - if (SS_IN_REFORM) { - ereport(DEBUG1, (errmsg("[SS reform] SSGetSnapshotData returns NULL in reform."))); - return NULL; - } pg_usleep(USECS_PER_SEC); } while (true); @@ -95,11 +90,6 @@ static Snapshot SSGetSnapshotDataFromMaster(Snapshot snapshot) Snapshot SSGetSnapshotData(Snapshot snapshot) { - if (SS_IN_REFORM) { - ereport(DEBUG1, (errmsg("[SS reform] SSGetSnapshotData returns NULL in reform."))); - return NULL; - } - /* For cm agent, it only query the system status using the parameter in memory. So don't need MVCC */ if (u_sess->libpq_cxt.IsConnFromCmAgent) { snapshot = SnapshotNow; @@ -288,7 +278,8 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo dms_txn_info.snapshotxmin = InvalidTransactionId; } - if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { + if (SS_IN_REFORM && !SSPerformingStandbyScenario() && + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(ERROR, (errmsg("SSTransactionIdGetCommitSeqNo failed during reform, xid=%lu.", transactionId))); } @@ -309,7 +300,7 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo } break; } else { - if (SS_IN_REFORM && + if (SS_IN_REFORM && !SSPerformingStandbyScenario() && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(ERROR, (errmsg("SSTransactionIdGetCommitSeqNo failed during reform, xid=%lu.", transactionId))); } @@ -394,7 +385,7 @@ void SSTransactionIdDidCommit(TransactionId transactionId, bool* ret_did_commit) transactionId, did_commit))); break; } else { - if (SS_IN_REFORM && + if (SS_IN_REFORM && !SSPerformingStandbyScenario() && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(ERROR, (errmsg("SSTransactionIdDidCommit failed during reform, xid=%lu.", transactionId))); } @@ -428,7 +419,7 @@ void SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress) transactionId, *in_progress))); break; } else { - if (SS_IN_REFORM && + if (SS_IN_REFORM && !SSPerformingStandbyScenario() && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(ERROR, (errmsg("SSTransactionIdIsInProgress failed during reform, xid=%lu.", transactionId))); } @@ -453,7 +444,7 @@ TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, u ereport(DEBUG1, (errmsg("SS get update xid success, multixact xid=%lu, uxid=%lu.", xmax, update_xid))); break; } else { - if (SS_IN_REFORM && + if (SS_IN_REFORM && !SSPerformingStandbyScenario() && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER || t_thrd.role == STREAM_WORKER)) { ereport(ERROR, (errmsg("SSMultiXactIdGetUpdateXid failed during reform, xid=%lu.", xmax))); } @@ -964,4 +955,25 @@ void SSStandbyUpdateRedirectInfo() redirect_manager->ss_standby_sxid = dms_sw_info.sxid; redirect_manager->ss_standby_scid = dms_sw_info.scid; +} + +bool SSCanFetchLocalSnapshotTxnRelatedInfo() +{ + if (SS_NORMAL_PRIMARY) { + return true; + } else if (SS_PERFORMING_SWITCHOVER) { + if (SS_REFORM_REFORMER && g_instance.dms_cxt.SSClusterState < NODESTATE_PROMOTE_APPROVE) { + return true; + } + } else if (SS_REFORM_REFORMER && SSPerformingStandbyScenario()) { + return true; + } else if (SS_REFORM_REFORMER && SS_ONDEMAND_BUILD_DONE) { + ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo; + SpinLockAcquire(&xmin_info->snapshot_available_lock); + bool snap_available = xmin_info->snapshot_available; + SpinLockRelease(&xmin_info->snapshot_available_lock); + return snap_available; + } + + return false; } \ No newline at end of file diff --git a/src/gausskernel/ddes/adapter/ss_xmin.cpp b/src/gausskernel/ddes/adapter/ss_xmin.cpp index c5475c237..7b4ce8e54 100644 --- a/src/gausskernel/ddes/adapter/ss_xmin.cpp +++ b/src/gausskernel/ddes/adapter/ss_xmin.cpp @@ -235,6 +235,9 @@ void SSSyncOldestXminWhenReform(uint8 reformer_id) LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); CalculateLocalLatestSnapshot(true); LWLockRelease(ProcArrayLock); + SpinLockAcquire(&xmin_info->snapshot_available_lock); + xmin_info->snapshot_available = true; + SpinLockRelease(&xmin_info->snapshot_available_lock); } else { int ret = DMS_SUCCESS; do { diff --git a/src/gausskernel/ddes/ddes_commit_id b/src/gausskernel/ddes/ddes_commit_id index ca66a0378..adf1b16e3 100644 --- a/src/gausskernel/ddes/ddes_commit_id +++ b/src/gausskernel/ddes/ddes_commit_id @@ -1,3 +1,3 @@ -dms_commit_id=87f7ded3ff6c66dcdfa979f1e1e289e4fd452ff2 +dms_commit_id=d2024b8747272b35d59b46b3ca38497ef6dc255e dss_commit_id=a304c8a21cc67282215d7653d5aa596810688149 cbb_commit_id=100323132dff039c9225216e64770050ad8a6d2b \ No newline at end of file diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index 7a2b377c4..1a096b7b4 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -229,6 +229,8 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt) xmin_info->global_oldest_xmin_active = false; SpinLockInit(&xmin_info->bitmap_active_nodes_lock); xmin_info->bitmap_active_nodes = 0; + SpinLockInit(&xmin_info->snapshot_available_lock); + xmin_info->snapshot_available = false; } dms_cxt->latest_snapshot_xmin = 0; dms_cxt->latest_snapshot_xmax = 0; diff --git a/src/gausskernel/storage/access/heap/heapam.cpp b/src/gausskernel/storage/access/heap/heapam.cpp index fa43969c3..6deb69a9d 100755 --- a/src/gausskernel/storage/access/heap/heapam.cpp +++ b/src/gausskernel/storage/access/heap/heapam.cpp @@ -7832,7 +7832,7 @@ TransactionId MultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uin { if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ - bool local_fetch = SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY; + bool local_fetch = SSCanFetchLocalSnapshotTxnRelatedInfo(); if (!local_fetch) { return SSMultiXactIdGetUpdateXid(xmax, t_infomask, t_infomask2); } diff --git a/src/gausskernel/storage/access/transam/transam.cpp b/src/gausskernel/storage/access/transam/transam.cpp index 103888be6..b4b18ab1b 100644 --- a/src/gausskernel/storage/access/transam/transam.cpp +++ b/src/gausskernel/storage/access/transam/transam.cpp @@ -396,7 +396,7 @@ bool TransactionIdDidCommit(TransactionId transactionId) /* true if given transa { if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ - bool local_fetch = SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY; + bool local_fetch = SSCanFetchLocalSnapshotTxnRelatedInfo(); if (!local_fetch) { bool didCommit; SSTransactionIdDidCommit(transactionId, &didCommit); diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index 581f9d0aa..1c93db7f0 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -123,6 +123,7 @@ #include "gstrace/storage_gstrace.h" #include "ddes/dms/ss_common_attr.h" #include "ddes/dms/ss_transaction.h" +#include "ddes/dms/ss_reform_common.h" #include "replication/ss_cluster_replication.h" #ifdef ENABLE_UT @@ -1378,7 +1379,7 @@ bool TransactionIdIsInProgress(TransactionId xid, uint32* needSync, bool shortcu if (ENABLE_DMS) { /* fetch TXN info locally if either reformer, original primary, or normal primary */ - bool local_fetch = SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY; + bool local_fetch = SSCanFetchLocalSnapshotTxnRelatedInfo(); if (!local_fetch) { bool in_progress = true; SSTransactionIdIsInProgress(xid, &in_progress); @@ -2062,12 +2063,8 @@ RETRY: Snapshot result; if (ENABLE_DMS) { - if (SS_IN_REFORM) { - ereport(ERROR, (errmsg("failed to request snapshot as current node is in reform!"))); - return NULL; - } /* fetch TXN info locally if either reformer, original primary, or normal primary */ - if (SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY) { + if (SSCanFetchLocalSnapshotTxnRelatedInfo()) { result = GetLocalSnapshotData(snapshot); snapshot->snapshotcsn = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo); } else { diff --git a/src/include/ddes/dms/dms_api.h b/src/include/ddes/dms/dms_api.h index fe76613c7..6d32268d9 100644 --- a/src/include/ddes/dms/dms_api.h +++ b/src/include/ddes/dms/dms_api.h @@ -32,7 +32,7 @@ extern "C" { #define DMS_LOCAL_MINOR_VER_WEIGHT 1000 #define DMS_LOCAL_MAJOR_VERSION 0 #define DMS_LOCAL_MINOR_VERSION 0 -#define DMS_LOCAL_VERSION 119 +#define DMS_LOCAL_VERSION 120 #define DMS_SUCCESS 0 #define DMS_ERROR (-1) @@ -48,6 +48,7 @@ extern "C" { #define DMS_INDEX_PROFILE_SIZE 96 #define DMS_MAX_IP_LEN 64 #define DMS_MAX_INSTANCES 64 +#define DMS_MAX_NAME_LEN 64 #define DMS_VERSION_MAX_LEN 256 #define DMS_OCK_LOG_PATH_LEN 256 @@ -582,6 +583,7 @@ typedef enum en_dms_wait_event { DMS_EVT_DCS_REQ_XA_IN_USE, DMS_EVT_DCS_REQ_END_XA, +// add new enum at tail, or make adaptations to openGauss DMS_EVT_COUNT, } dms_wait_event_t; @@ -700,6 +702,35 @@ typedef enum en_broadcast_scope { DMS_BROADCAST_TYPE_COUNT, } dms_broadcast_scope_e; +typedef struct st_dv_drc_buf_info { + char data[DMS_MAX_NAME_LEN]; /* user defined resource(page) identifier */ + unsigned char master_id; + unsigned long long copy_insts; /* bitmap for owners, for S mode, more than one owner may exist */ + unsigned char claimed_owner; /* owner */ + unsigned char lock_mode; /* current DRC lock mode */ + unsigned char last_edp; /* the newest edp instance id */ + unsigned char type; /* page or lock */ + unsigned char in_recovery; /* in recovery or not */ + unsigned char copy_promote; /* copy promote to owner, can not release, may need flush */ + unsigned short part_id; /* which partition id that current page belongs to */ + unsigned long long edp_map; /* indicate which instance has current page's EDP(Earlier Dirty Page) */ + unsigned long long lsn; /* the newest edp LSN of current page in the cluster */ + unsigned short len; /* the length of data below */ + unsigned char recovery_skip; /* DRC is accessed in recovery and skip because drc has owner */ + unsigned char recycling; + unsigned char converting_req_info_inst_id; + unsigned char converting_req_info_curr_mode; + unsigned char converting_req_info_req_mode; + unsigned char is_valid; +} dv_drc_buf_info; + +typedef struct st_dms_reform_start_context { + dms_role_t role; + dms_reform_type_t reform_type; + unsigned long long bitmap_participated; + unsigned long long bitmap_reconnect; +} dms_reform_start_context_t; + typedef int(*dms_get_list_stable)(void *db_handle, unsigned long long *list_stable, unsigned char *reformer_id); typedef int(*dms_save_list_stable)(void *db_handle, unsigned long long list_stable, unsigned char reformer_id, unsigned long long list_in, unsigned int save_ctrl); @@ -721,8 +752,7 @@ typedef int(*dms_df_recovery)(void *db_handle, unsigned long long list_in, void typedef int(*dms_opengauss_startup)(void *db_handle); typedef int(*dms_opengauss_recovery_standby)(void *db_handle, int inst_id); typedef int(*dms_opengauss_recovery_primary)(void *db_handle, int inst_id); -typedef void(*dms_reform_start_notify)(void *db_handle, dms_role_t role, unsigned char reform_type, - unsigned long long bitmap_nodes); +typedef void(*dms_reform_start_notify)(void *db_handle, dms_reform_start_context_t *rs_ctx); typedef int(*dms_undo_init)(void *db_handle, unsigned char inst_id); typedef int(*dms_tx_area_init)(void *db_handle, unsigned char inst_id); typedef int(*dms_tx_area_load)(void *db_handle, unsigned char inst_id); @@ -1060,6 +1090,7 @@ typedef struct st_dms_profile { unsigned char scrlock_server_bind_core_end; unsigned char parallel_thread_num; unsigned int max_wait_time; + char gsdb_home[DMS_LOG_PATH_LEN]; } dms_profile_t; typedef struct st_logger_param { @@ -1078,6 +1109,30 @@ typedef enum en_dms_info_id { DMS_INFO_REFORM_LAST = 1, } dms_info_id_e; +typedef struct st_wait_cmd_stat_result { + char name[DMS_MAX_NAME_LEN]; + char p1[DMS_MAX_NAME_LEN]; + char wait_class[DMS_MAX_NAME_LEN]; + unsigned long long wait_count; + unsigned long long wait_time; + unsigned char is_valid; +} wait_cmd_stat_result_t; + +typedef struct st_drc_local_lock_res_result { + char lock_id[DMS_MAX_NAME_LEN]; + unsigned char is_owner; + unsigned char is_locked; + unsigned short count; + unsigned char releasing; + unsigned short shared_count; + unsigned short stat; + unsigned short sid; + unsigned short rmid; + unsigned short rmid_sum; + unsigned char lock_mode; + unsigned char is_valid; +} drc_local_lock_res_result_t; + #ifdef __cplusplus } #endif diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 28a821c56..b35c35859 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -80,8 +80,8 @@ (SS_REFORM_REFORMER && (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS)) #define SS_PERFORMING_SWITCHOVER \ - (ENABLE_DMS && (g_instance.dms_cxt.SSClusterState > NODESTATE_NORMAL && \ - g_instance.dms_cxt.SSClusterState != NODESTATE_STANDBY_FAILOVER_PROMOTING)) + (ENABLE_DMS && SS_IN_REFORM && \ + (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS)) #define SS_STANDBY_PROMOTING \ (ENABLE_DMS && (g_instance.dms_cxt.SSClusterState == NODESTATE_STANDBY_PROMOTING)) diff --git a/src/include/ddes/dms/ss_dms_recovery.h b/src/include/ddes/dms/ss_dms_recovery.h index 901566f7c..c42b545b0 100644 --- a/src/include/ddes/dms/ss_dms_recovery.h +++ b/src/include/ddes/dms/ss_dms_recovery.h @@ -64,6 +64,7 @@ typedef struct st_reform_info { dms_role_t dms_role; dms_reform_type_t reform_type; unsigned long long bitmap_nodes; + unsigned long long bitmap_reconnect; timeval reform_start_time; timeval reform_end_time; uint64 old_bitmap; // Save the cluster nodes bitmap before REFORM diff --git a/src/include/ddes/dms/ss_reform_common.h b/src/include/ddes/dms/ss_reform_common.h index 9ccb97ed6..b610ed21e 100644 --- a/src/include/ddes/dms/ss_reform_common.h +++ b/src/include/ddes/dms/ss_reform_common.h @@ -53,3 +53,4 @@ int SSXLogFileOpenAnyTLI(XLogSegNo segno, int emode, uint32 sources, char* xlog_ void SSStandbySetLibpqswConninfo(); void SSDoradoRefreshMode(); void SSDoradoUpdateHAmode(); +bool SSPerformingStandbyScenario(); diff --git a/src/include/ddes/dms/ss_transaction.h b/src/include/ddes/dms/ss_transaction.h index dabe4fecf..ba6d9f85e 100644 --- a/src/include/ddes/dms/ss_transaction.h +++ b/src/include/ddes/dms/ss_transaction.h @@ -124,5 +124,6 @@ void SSSendLatestSnapshotToStandby(TransactionId xmin, TransactionId xmax, Commi int SSUpdateLatestSnapshotOfStandby(char *data, uint32 len); int SSReloadReformCtrlPage(uint32 len); void SSRequestAllStandbyReloadReformCtrlPage(); +bool SSCanFetchLocalSnapshotTxnRelatedInfo(); #endif diff --git a/src/include/ddes/dms/ss_xmin.h b/src/include/ddes/dms/ss_xmin.h index bad467482..4eaf42188 100644 --- a/src/include/ddes/dms/ss_xmin.h +++ b/src/include/ddes/dms/ss_xmin.h @@ -56,6 +56,8 @@ typedef struct st_ss_xmin_info { bool global_oldest_xmin_active; slock_t bitmap_active_nodes_lock; uint64 bitmap_active_nodes; + slock_t snapshot_available_lock; + bool snapshot_available; } ss_xmin_info_t; #define SSSnapshotXminHashPartition(hashcode) ((hashcode) % NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS)