!4087 【资源池化】snapshot xmin的hash表使用多个partition锁进行并发控制
Merge pull request !4087 from 董宁/bugfix6_up
This commit is contained in:
@ -29,7 +29,9 @@
|
||||
#include "storage/procarray.h"
|
||||
#include "storage/ipc.h"
|
||||
|
||||
#define DMS_AUXILIARY_SLEEP_TIME (1000) // 1s 1000ms
|
||||
#define DMS_AUXILIARY_PRIMARY_SLEEP_TIME (5000) // 5s 5000ms
|
||||
#define DMS_AUXILIARY_STANDBY_SLEEP_TIME (1000) // 1s 1000ms
|
||||
|
||||
static void dms_auxiliary_request_shutdown_handler(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
@ -74,6 +76,12 @@ void dms_auxiliary_handle_exception()
|
||||
/* Prevent interrupts while cleaning up */
|
||||
HOLD_INTERRUPTS();
|
||||
|
||||
if (hash_get_seq_num() > 0) {
|
||||
release_all_seq_scan();
|
||||
}
|
||||
|
||||
LWLockReleaseAll();
|
||||
|
||||
/* Report the error to the server log */
|
||||
EmitErrorReport();
|
||||
|
||||
@ -113,8 +121,10 @@ void DmsAuxiliaryMain(void)
|
||||
ctl.keysize = sizeof(ss_snap_xmin_key_t);
|
||||
ctl.entrysize = sizeof(ss_snap_xmin_item_t);
|
||||
ctl.hash = tag_hash;
|
||||
ctl.num_partitions = NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS;
|
||||
ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
|
||||
xmin_info->snap_cache = HeapMemInitHash("DMS snap xmin cache", 60, 30000, &ctl, HASH_ELEM | HASH_FUNCTION);
|
||||
xmin_info->snap_cache = HeapMemInitHash("DMS snapshot xmin cache", 60, 30000, &ctl,
|
||||
HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
|
||||
|
||||
for (;;) {
|
||||
if (t_thrd.dms_aux_cxt.shutdown_requested) {
|
||||
@ -129,13 +139,16 @@ void DmsAuxiliaryMain(void)
|
||||
|
||||
if (SS_NORMAL_PRIMARY) {
|
||||
MaintXminInPrimary();
|
||||
int rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, DMS_AUXILIARY_PRIMARY_SLEEP_TIME);
|
||||
if (rc & WL_POSTMASTER_DEATH) {
|
||||
gs_thread_exit(1);
|
||||
}
|
||||
} else if (SS_NORMAL_STANDBY) {
|
||||
MaintXminInStandby();
|
||||
}
|
||||
|
||||
int rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, DMS_AUXILIARY_SLEEP_TIME);
|
||||
if (rc & WL_POSTMASTER_DEATH) {
|
||||
gs_thread_exit(1);
|
||||
int rc = WaitLatch(&t_thrd.proc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, DMS_AUXILIARY_STANDBY_SLEEP_TIME);
|
||||
if (rc & WL_POSTMASTER_DEATH) {
|
||||
gs_thread_exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1693,10 +1693,17 @@ static void SSXminInfoPrepare()
|
||||
{
|
||||
ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
|
||||
if (g_instance.dms_cxt.SSReformInfo.dms_role == DMS_ROLE_REFORMER) {
|
||||
SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
|
||||
xmin_info->prev_global_oldest_xmin = xmin_info->global_oldest_xmin;
|
||||
xmin_info->global_oldest_xmin_active = false;
|
||||
xmin_info->global_oldest_xmin = MaxTransactionId;
|
||||
SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
|
||||
for (int i = 0; i < DMS_MAX_INSTANCES; i++) {
|
||||
xmin_info->node_table[i].active = false;
|
||||
xmin_info->node_table[i].notify_oldest_xmin = MaxTransactionId;
|
||||
ss_node_xmin_item_t *item = &xmin_info->node_table[i];
|
||||
SpinLockAcquire(&item->item_lock);
|
||||
item->active = false;
|
||||
item->notify_oldest_xmin = MaxTransactionId;
|
||||
SpinLockRelease(&item->item_lock);
|
||||
}
|
||||
}
|
||||
xmin_info->bitmap_active_nodes = 0;
|
||||
|
@ -33,6 +33,11 @@
|
||||
|
||||
extern void CalculateLocalLatestSnapshot(bool forceCalc);
|
||||
|
||||
uint32 SSSnapshotXminKeyHashCode(const ss_snap_xmin_key_t *key)
|
||||
{
|
||||
return get_hash_value(g_instance.dms_cxt.SSXminInfo.snap_cache, key);
|
||||
}
|
||||
|
||||
uint64 GetOldestXminInNodeTable()
|
||||
{
|
||||
ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
|
||||
@ -70,14 +75,18 @@ void MaintXminInPrimary(void)
|
||||
uint64 snap_xmin = MaxTransactionId;
|
||||
TimestampTz cur_time = GetCurrentTimestamp();
|
||||
|
||||
while ((xmin_item = (ss_snap_xmin_item_t*)hash_seq_search(&hash_seq)) != NULL) {
|
||||
if (SS_IN_REFORM) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS; i++) {
|
||||
LWLock* partition_lock = SSSnapshotXminHashPartitionLockByIndex(i);
|
||||
LWLockAcquire(partition_lock, LW_EXCLUSIVE);
|
||||
}
|
||||
|
||||
while ((xmin_item = (ss_snap_xmin_item_t*)hash_seq_search(&hash_seq)) != NULL) {
|
||||
if (TimestampDifferenceExceeds(xmin_item->timestamp, cur_time, DMS_MSG_MAX_WAIT_TIME)) {
|
||||
ss_snap_xmin_key_t key{.xmin = xmin_item->xmin};
|
||||
hash_search(snap_cache, &key, HASH_REMOVE, NULL);
|
||||
if (hash_search(snap_cache, &key, HASH_REMOVE, NULL) == NULL) {
|
||||
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("snapshot xmin cache hash table corrupted")));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -86,21 +95,24 @@ void MaintXminInPrimary(void)
|
||||
}
|
||||
}
|
||||
|
||||
if (snap_xmin == MaxTransactionId) {
|
||||
TimestampTz recent_time = pg_atomic_read_u64((volatile uint64*)&xmin_info->recent_snap_send_time);
|
||||
if (TimestampDifferenceExceeds(cur_time, recent_time, 0)) {
|
||||
return;
|
||||
}
|
||||
for (int i = NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS - 1; i >= 0; i--) {
|
||||
LWLock* partition_lock = SSSnapshotXminHashPartitionLockByIndex(i);
|
||||
LWLockRelease(partition_lock);
|
||||
}
|
||||
|
||||
xmin_info->snap_oldest_xmin = snap_xmin;
|
||||
uint64 new_global_xmin = GetOldestXminInNodeTable();
|
||||
if (TransactionIdPrecedes(snap_xmin, new_global_xmin)) {
|
||||
new_global_xmin = snap_xmin;
|
||||
}
|
||||
|
||||
if (new_global_xmin == MaxTransactionId) {
|
||||
return;
|
||||
}
|
||||
|
||||
SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
|
||||
xmin_info->global_oldest_xmin = new_global_xmin;
|
||||
if (xmin_info->global_oldest_xmin_active) {
|
||||
xmin_info->global_oldest_xmin = new_global_xmin;
|
||||
}
|
||||
SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
|
||||
}
|
||||
|
||||
@ -127,35 +139,36 @@ bool RecordSnapshotBeforeSend(uint8 inst_id, uint64 xmin)
|
||||
}
|
||||
|
||||
ss_snap_xmin_key_t key = {.xmin = xmin};
|
||||
uint32 key_hash = SSSnapshotXminKeyHashCode(&key);
|
||||
LWLock *partition_lock = SSSnapshotXminHashPartitionLock(key_hash);
|
||||
LWLockAcquire(partition_lock, LW_EXCLUSIVE);
|
||||
ss_snap_xmin_item_t *xmin_item = (ss_snap_xmin_item_t*)hash_search(snap_cache, &key, HASH_ENTER_NULL, NULL);
|
||||
if (xmin_item == NULL) {
|
||||
LWLockRelease(partition_lock);
|
||||
ereport(WARNING, (errmodule(MOD_DMS), errmsg("insert snapshot into snap_cache table failed, "
|
||||
"capacity is not enough")));
|
||||
return false;
|
||||
}
|
||||
xmin_item->xmin = xmin;
|
||||
TimestampTz send_time = GetCurrentTimestamp();
|
||||
bool ret = false;
|
||||
do {
|
||||
uint64 recent_time = pg_atomic_read_u64((volatile uint64*)&xmin_info->recent_snap_send_time);
|
||||
if ((uint64)send_time <= recent_time) {
|
||||
break;
|
||||
}
|
||||
ret = pg_atomic_compare_exchange_u64((volatile uint64*)&xmin_info->recent_snap_send_time, &recent_time, send_time);
|
||||
} while (!ret);
|
||||
xmin_item->timestamp = send_time;
|
||||
LWLockRelease(partition_lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
uint64 SSGetGlobalOldestXmin(uint64 globalxmin)
|
||||
{
|
||||
ss_xmin_info_t *xmin_info = &g_instance.dms_cxt.SSXminInfo;
|
||||
if (!xmin_info->global_oldest_xmin_active) {
|
||||
return globalxmin;
|
||||
}
|
||||
|
||||
uint64 ret_globalxmin = globalxmin;
|
||||
SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
|
||||
if (!xmin_info->global_oldest_xmin_active) {
|
||||
if (TransactionIdPrecedes(xmin_info->prev_global_oldest_xmin, globalxmin)) {
|
||||
ret_globalxmin = xmin_info->prev_global_oldest_xmin;
|
||||
}
|
||||
SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
|
||||
return ret_globalxmin;
|
||||
}
|
||||
|
||||
if (TransactionIdPrecedes(xmin_info->global_oldest_xmin, globalxmin)) {
|
||||
ret_globalxmin = xmin_info->global_oldest_xmin;
|
||||
}
|
||||
@ -194,7 +207,9 @@ void SSUpdateNodeOldestXmin(uint8 inst_id, unsigned long long oldest_xmin)
|
||||
|
||||
SpinLockAcquire(&xmin_info->bitmap_active_nodes_lock);
|
||||
if (xmin_info->bitmap_active_nodes == reform_info->bitmap_nodes) {
|
||||
SpinLockAcquire(&xmin_info->global_oldest_xmin_lock);
|
||||
xmin_info->global_oldest_xmin_active = true;
|
||||
SpinLockRelease(&xmin_info->global_oldest_xmin_lock);
|
||||
}
|
||||
SpinLockRelease(&xmin_info->bitmap_active_nodes_lock);
|
||||
}
|
||||
|
@ -212,9 +212,9 @@ static void knl_g_dms_init(knl_g_dms_context *dms_cxt)
|
||||
}
|
||||
xmin_info->snap_cache = NULL;
|
||||
xmin_info->snap_oldest_xmin = MaxTransactionId;
|
||||
xmin_info->recent_snap_send_time = 0;
|
||||
SpinLockInit(&xmin_info->global_oldest_xmin_lock);
|
||||
xmin_info->global_oldest_xmin = MaxTransactionId;
|
||||
xmin_info->prev_global_oldest_xmin = MaxTransactionId;
|
||||
xmin_info->global_oldest_xmin_active = false;
|
||||
SpinLockInit(&xmin_info->bitmap_active_nodes_lock);
|
||||
xmin_info->bitmap_active_nodes = 0;
|
||||
|
@ -198,7 +198,8 @@ static const char *BuiltinTrancheNames[] = {
|
||||
"AuditIndextblLock",
|
||||
"PCABufferContentLock",
|
||||
"XlogTrackPartLock",
|
||||
"SSTxnStatusCachePartLock"
|
||||
"SSTxnStatusCachePartLock",
|
||||
"SSSnapshotXminCachePartLock"
|
||||
};
|
||||
|
||||
static void RegisterLWLockTranches(void);
|
||||
@ -662,6 +663,10 @@ static void InitializeLWLocks(int numLocks)
|
||||
LWLockInitialize(&lock->lock, LWTRANCHE_SS_TXNSTATUS_PARTITION);
|
||||
}
|
||||
|
||||
for (id = 0; id < NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS; id++, lock++) {
|
||||
LWLockInitialize(&lock->lock, LWTRANCHE_SS_SNAPSHOT_XMIN_PARTITION);
|
||||
}
|
||||
|
||||
Assert((lock - t_thrd.shemem_ptr_cxt.mainLWLockArray) == NumFixedLWLocks);
|
||||
|
||||
for (id = NumFixedLWLocks; id < numLocks; id++, lock++) {
|
||||
|
@ -50,14 +50,20 @@ typedef struct st_ss_xmin_info {
|
||||
ss_node_xmin_item_t node_table[DMS_MAX_INSTANCES];
|
||||
struct HTAB* snap_cache;
|
||||
uint64 snap_oldest_xmin;
|
||||
volatile TimestampTz recent_snap_send_time;
|
||||
slock_t global_oldest_xmin_lock;
|
||||
uint64 global_oldest_xmin;
|
||||
uint64 prev_global_oldest_xmin;
|
||||
bool global_oldest_xmin_active;
|
||||
slock_t bitmap_active_nodes_lock;
|
||||
uint64 bitmap_active_nodes;
|
||||
} ss_xmin_info_t;
|
||||
|
||||
#define SSSnapshotXminHashPartition(hashcode) ((hashcode) % NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS)
|
||||
#define SSSnapshotXminHashPartitionLock(hashcode) \
|
||||
(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstSSSnapshotXminCacheLock + SSSnapshotXminHashPartition(hashcode)].lock)
|
||||
#define SSSnapshotXminHashPartitionLockByIndex(i) \
|
||||
(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstSSSnapshotXminCacheLock + (i)].lock)
|
||||
uint32 SSSnapshotXminKeyHashCode(const ss_snap_xmin_key_t *key);
|
||||
void MaintXminInPrimary(void);
|
||||
void MaintXminInStandby(void);
|
||||
bool RecordSnapshotBeforeSend(uint8 inst_id, uint64 xmin);
|
||||
|
@ -151,6 +151,9 @@ const struct LWLOCK_PARTITION_DESC LWLockPartInfo[] = {
|
||||
/* Number of partitions of the txnstatus mapping hashtable */
|
||||
#define NUM_TXNSTATUS_CACHE_PARTITIONS 256
|
||||
|
||||
/* Number of partitions of the snapshot xmin cache hashtable */
|
||||
#define NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS 32
|
||||
|
||||
/*
|
||||
* WARNING---Please keep the order of LWLockTrunkOffset and BuiltinTrancheIds consistent!!!
|
||||
*/
|
||||
@ -200,8 +203,10 @@ const struct LWLOCK_PARTITION_DESC LWLockPartInfo[] = {
|
||||
|
||||
/* txn status cache */
|
||||
#define FirstTxnStatusCacheLock (FirstXlogTrackLock + NUM_XLOG_TRACK_PARTITIONS)
|
||||
/* shared-storage snapshot xmin cache*/
|
||||
#define FirstSSSnapshotXminCacheLock (FirstTxnStatusCacheLock + NUM_TXNSTATUS_CACHE_PARTITIONS)
|
||||
/* must be last: */
|
||||
#define NumFixedLWLocks (FirstTxnStatusCacheLock + NUM_TXNSTATUS_CACHE_PARTITIONS)
|
||||
#define NumFixedLWLocks (FirstSSSnapshotXminCacheLock + NUM_SS_SNAPSHOT_XMIN_CACHE_PARTITIONS)
|
||||
/*
|
||||
* WARNING----Please keep BuiltinTrancheIds and BuiltinTrancheNames consistent!!!
|
||||
*
|
||||
@ -282,6 +287,7 @@ enum BuiltinTrancheIds
|
||||
LWTRANCHE_PCA_BUFFER_CONTENT,
|
||||
LWTRANCHE_XLOG_TRACK_PARTITION,
|
||||
LWTRANCHE_SS_TXNSTATUS_PARTITION,
|
||||
LWTRANCHE_SS_SNAPSHOT_XMIN_PARTITION,
|
||||
/*
|
||||
* Each trancheId above should have a corresponding item in BuiltinTrancheNames;
|
||||
*/
|
||||
|
Reference in New Issue
Block a user