add 半无锁dynamiac hash操作

This commit is contained in:
arcoalien@qq.com
2023-11-02 11:43:58 +08:00
parent 736b67eac1
commit 66ef5aec86
6 changed files with 86 additions and 93 deletions

View File

@ -1607,6 +1607,12 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b
case HASH_REMOVE:
if (currBucket != NULL) {
CLEAR_BUFFERTAG(*((BufferTag*)(ELEMENTKEY(currBucket))));
pg_memory_barrier();
/* remove record from hash bucket's chain. */
*prevBucketPtr = currBucket->link;
freelist_idx = FREELIST_IDX(hctl, hashvalue);
/* if partitioned, must lock to touch nentries and freeList */
if (IS_PARTITIONED(hctl)) {
@ -1615,9 +1621,6 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b
Assert(hctl->freeList[freelist_idx].nentries > 0);
hctl->freeList[freelist_idx].nentries--;
/* remove record from hash bucket's chain. */
*prevBucketPtr = currBucket->link;
/* add the record to the freelist for this table. */
currBucket->link = hctl->freeList[freelist_idx].freeList;
hctl->freeList[freelist_idx].freeList = currBucket;
@ -1654,14 +1657,16 @@ void* buf_hash_operate(HTAB* hashp, const BufferTag* keyPtr, uint32 hashvalue, b
}
}
/* link into hashbucket chain */
*prevBucketPtr = currBucket;
currBucket->link = NULL;
/* copy key into record */
currBucket->hashvalue = hashvalue;
BUFFERTAGS_PTR_SET((BufferTag*)(ELEMENTKEY(currBucket)), keyPtr);
pg_memory_barrier();
/* link into hashbucket chain */
*prevBucketPtr = currBucket;
/*
* Caller is expected to fill the data field on return. DO NOT
* insert any code that could possibly throw error here, as doing

View File

@ -539,7 +539,6 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
int ret = DMS_SUCCESS;
int buf_id = -1;
uint32 hash;
LWLock *partition_lock = NULL;
BufferDesc *buf_desc = NULL;
RelFileNode relfilenode = tag->rnode;
bool get_lock = false;
@ -557,24 +556,13 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
*buf_ctrl = NULL;
hash = BufTableHashCode(tag);
partition_lock = BufMappingPartitionLock(hash);
uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount;
PG_TRY();
{
do {
get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"lock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, partition_lock))));
ret = GS_TIMEOUT;
break;
}
buf_id = BufTableLookup(tag, hash);
if (buf_id < 0) {
LWLockRelease(partition_lock);
break;
}
@ -587,7 +575,11 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
(void)PinBuffer(buf_desc, NULL);
is_seg = false;
}
LWLockRelease(partition_lock);
if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) {
DmsReleaseBuffer(buf_desc->buf_id + 1, is_seg);
break;
}
bool wait_success = SSWaitIOTimeout(buf_desc);
if (!wait_success) {
@ -710,25 +702,13 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
int buf_id = -1;
BufferTag* tag = (BufferTag *)pageid;
uint32 hash;
LWLock *partition_lock = NULL;
uint64 buf_state;
int ret = DMS_SUCCESS;
bool get_lock;
hash = BufTableHashCode(tag);
partition_lock = BufMappingPartitionLock(hash);
bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"lwlock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, partition_lock))));
return GS_TIMEOUT;
}
buf_id = BufTableLookup(tag, hash);
if (buf_id < 0) {
/* not found in shared buffer */
LWLockRelease(partition_lock);
return ret;
}
@ -740,9 +720,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
buf_desc = GetBufferDescriptor(buf_id);
if (SS_PRIMARY_MODE) {
buf_state = LockBufHdr(buf_desc);
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0) {
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0 ||
!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) {
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
return DMS_ERROR;
}
@ -752,7 +732,6 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
@ -775,7 +754,6 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
}
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
return ret;
}
@ -785,7 +763,11 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
(void)PinBuffer(buf_desc, NULL);
}
LWLockRelease(partition_lock);
if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) {
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
return ret;
}
bool wait_success = SSWaitIOTimeout(buf_desc);
if (!wait_success) {
@ -1359,7 +1341,6 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
{
int buf_id;
uint32 hash;
LWLock *partition_lock = NULL;
BufferTag *tag = (BufferTag *)pageid;
BufferDesc *buf_desc;
bool ret = true;
@ -1378,21 +1359,10 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
#endif
hash = BufTableHashCode(tag);
partition_lock = BufMappingPartitionLock(hash);
uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount;
PG_TRY();
{
bool get_lock = SSLWLockAcquireTimeout(partition_lock, LW_SHARED);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"lwlock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, partition_lock))));
ret = false;
break;
}
buf_id = BufTableLookup(tag, hash);
if (buf_id >= 0) {
buf_desc = GetBufferDescriptor(buf_id);
@ -1402,7 +1372,12 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
(void)PinBuffer(buf_desc, NULL);
}
LWLockRelease(partition_lock);
if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) {
SSUnPinBuffer(buf_desc);
*ret_buf_desc = NULL;
break;
}
bool wait_success = SSWaitIOTimeout(buf_desc);
if (!wait_success) {
@ -1415,7 +1390,6 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
*is_valid = (pg_atomic_read_u64(&buf_desc->state) & BM_VALID) != 0;
*ret_buf_desc = buf_desc;
} else {
LWLockRelease(partition_lock);
*ret_buf_desc = NULL;
}
}

View File

@ -83,7 +83,6 @@ BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access, boo
ItemId itemid;
IndexTuple itup;
BlockNumber blkno;
BlockNumber par_blkno;
BTStack new_stack = NULL;
/*
@ -114,7 +113,6 @@ BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access, boo
itemid = PageGetItemId(page, offnum);
itup = (IndexTuple)PageGetItem(page, itemid);
blkno = BTreeInnerTupleGetDownLink(itup);
par_blkno = BufferGetBlockNumber(*bufP);
/*
* We need to save the location of the index entry we chose in the
@ -128,7 +126,7 @@ BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access, boo
*/
if (needStack) {
new_stack = (BTStack)palloc(sizeof(BTStackData));
new_stack->bts_blkno = par_blkno;
new_stack->bts_blkno = BufferGetBlockNumber(*bufP);
new_stack->bts_offset = offnum;
new_stack->bts_btentry = blkno;
new_stack->bts_parent = stack_in;

View File

@ -2711,10 +2711,9 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
/* determine its hash code and partition lock ID */
new_hash = BufTableHashCode(&new_tag);
new_partition_lock = BufMappingPartitionLock(new_hash);
retry:
/* see if the block is in the buffer pool already */
(void)LWLockAcquire(new_partition_lock, LW_SHARED);
pgstat_report_waitevent(WAIT_EVENT_BUF_HASH_SEARCH);
buf_id = BufTableLookup(&new_tag, new_hash);
pgstat_report_waitevent(WAIT_EVENT_END);
@ -2728,8 +2727,10 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
valid = PinBuffer(buf, strategy);
/* Can release the mapping lock as soon as we've pinned it */
LWLockRelease(new_partition_lock);
if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &new_tag)) {
UnpinBuffer(buf, true);
goto retry;
}
*found = TRUE;
@ -2761,11 +2762,7 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
return buf;
}
/*
* Didn't find it in the buffer pool. We'll have to initialize a new
* buffer. Remember to unlock the mapping lock while doing the work.
*/
LWLockRelease(new_partition_lock);
new_partition_lock = BufMappingPartitionLock(new_hash);
/* Loop here in case we have to try another victim buffer */
for (;;) {
bool needGetLock = false;
@ -2899,6 +2896,7 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
* To change the association of a valid buffer, we'll need to have
* exclusive lock on both the old and new mapping partitions.
*/
old_flags = buf_state & BUF_FLAG_MASK;
if (old_flags & BM_TAG_VALID) {
/*
* Need to compute the old tag's hashcode and partition lock ID.
@ -2921,6 +2919,7 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
old_partition_lock = NULL;
}
buf_state = LockBufHdr(buf);
/*
* Try to make a hashtable entry for the buffer under its new tag.
* This could fail because while we were writing someone else
@ -2936,6 +2935,7 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
* pool in the first place. First, give up the buffer we were
* planning to use.
*/
UnlockBufHdr(buf, buf_state);
UnpinBuffer(buf, true);
/* Can give up that buffer's mapping partition lock now */
@ -2972,10 +2972,6 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
return buf;
}
/*
* Need to lock the buffer header too in order to change its tag.
*/
buf_state = LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
* doing the I/O and making the new hashtable entry. If so, we can't
@ -3001,11 +2997,12 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
}
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
LWLockRelease(old_partition_lock);
}
UnlockBufHdr(buf, buf_state);
LWLockRelease(new_partition_lock);
UnpinBuffer(buf, true);
}
@ -3037,8 +3034,6 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
}
UnlockBufHdr(buf, buf_state);
if (ENABLE_DMS) {
GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL;
GetDmsBufCtrl(buf->buf_id)->been_loaded = false;
@ -3064,7 +3059,7 @@ BufferDesc *BufferAlloc(const RelFileNode &rel_file_node, char relpersistence, F
buf->extra->seg_blockno = InvalidBlockNumber;
}
LWLockRelease(new_partition_lock);
UnlockBufHdr(buf, buf_state);
/*
* Buffer contents are currently invalid. Try to get the io_in_progress
* lock. If StartBufferIO returns false, then someone else managed to
@ -7271,7 +7266,6 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum)
SMgrRelation smgr = smgropen(rnode, InvalidBackendId);
BufferTag tag; /* identity of target block */
uint32 hash; /* hash value for tag */
LWLock* partitionLock; /* buffer partition lock for it */
int bufId;
BufferDesc *bufHdr;
uint64 bufState;
@ -7281,12 +7275,9 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum)
/* determine its hash code and partition lock ID */
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/* see if the block is in the buffer pool */
LWLockAcquire(partitionLock, LW_SHARED);
bufId = BufTableLookup(&tag, hash);
LWLockRelease(partitionLock);
/* didn't find it, so nothing to do */
if (bufId < 0) {
@ -7297,6 +7288,10 @@ void ForgetBuffer(RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum)
bufHdr = GetBufferDescriptor(bufId);
bufState = LockBufHdr(bufHdr);
if (!BUFFERTAGS_PTR_EQUAL(&bufHdr->tag, &tag)) {
UnlockBufHdr(bufHdr, bufState);
return;
}
/*
* The buffer might been evicted after we released the partition lock and
* before we acquired the buffer header lock. If so, the buffer we've
@ -7381,15 +7376,22 @@ bool IsPageHitBufferPool(RelFileNode& node, ForkNumber forkNum, BlockNumber bloc
INIT_BUFFERTAG(newTag, node, forkNum, blockNum);
uint32 new_hash = BufTableHashCode(&newTag);
LWLock *new_partition_lock = BufMappingPartitionLock(new_hash);
/* see if the block is in the buffer pool already */
(void)LWLockAcquire(new_partition_lock, LW_SHARED);
bufId = BufTableLookup(&newTag, new_hash);
LWLockRelease(new_partition_lock);
if (bufId != -1) {
return true;
if (bufId < 0) {
return false;
}
return false;
BufferDesc* bufHdr = GetBufferDescriptor(bufId);
uint64 bufState = LockBufHdr(bufHdr);
if (!BUFFERTAGS_PTR_EQUAL(&bufHdr->tag, &newTag)) {
UnlockBufHdr(bufHdr, bufState);
return false;
}
UnlockBufHdr(bufHdr, bufState);
return true;
}
void buffer_in_progress_pop()

View File

@ -727,18 +727,31 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
INIT_BUFFERTAG(new_tag, rnode, forkNum, blockNum);
new_hash = BufTableHashCode(&new_tag);
new_partition_lock = BufMappingPartitionLock(new_hash);
LWLockAcquire(new_partition_lock, LW_SHARED);
retry:
int buf_id = BufTableLookup(&new_tag, new_hash);
if (buf_id >= 0) {
return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr);
buf = GetBufferDescriptor(buf_id);
bool valid = SegPinBuffer(buf);
if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &new_tag)) {
SegUnpinBuffer(buf);
goto retry;
}
*foundPtr = true;
if (!valid) {
if (SegStartBufferIO(buf, true)) {
*foundPtr = false;
}
}
return buf;
}
*foundPtr = FALSE;
LWLockRelease(new_partition_lock);
new_partition_lock = BufMappingPartitionLock(new_hash);
for (;;) {
ReservePrivateRefCountEntry();
@ -771,6 +784,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
}
}
old_flags = buf_state & BUF_FLAG_MASK;
old_flag_valid = old_flags & BM_TAG_VALID;
if (old_flag_valid) {
@ -787,9 +801,11 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
old_partition_lock = NULL;
}
buf_state = LockBufHdr(buf);
buf_id = BufTableInsert(&new_tag, new_hash, buf->buf_id);
if (buf_id >= 0) {
UnlockBufHdr(buf, buf_state);
SegUnpinBuffer(buf);
if (old_flag_valid && old_partition_lock != new_partition_lock)
LWLockRelease(old_partition_lock);
@ -797,7 +813,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr);
}
buf_state = LockBufHdr(buf);
old_flags = buf_state & BUF_FLAG_MASK;
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY)) {
@ -815,11 +830,11 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
break;
}
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if (old_flag_valid && old_partition_lock != new_partition_lock) {
LWLockRelease(old_partition_lock);
}
UnlockBufHdr(buf, buf_state);
LWLockRelease(new_partition_lock);
SegUnpinBuffer(buf);
}
@ -828,7 +843,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
BUF_USAGECOUNT_MASK);
buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
UnlockBufHdr(buf, buf_state);
if (ENABLE_DMS) {
GetDmsBufCtrl(buf->buf_id)->lock_mode = DMS_LOCK_NULL;
@ -842,6 +856,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
}
}
LWLockRelease(new_partition_lock);
UnlockBufHdr(buf, buf_state);
*foundPtr = !SegStartBufferIO(buf, true);
return buf;

View File

@ -479,9 +479,6 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_
INIT_BUFFERTAG(tag, *rnode, forknum, logic_blocknum);
uint32 hashcode = BufTableHashCode(&tag);
LWLock *partition_lock = BufMappingPartitionLock(hashcode);
LWLockAcquire(partition_lock, LW_SHARED);
int buf_id = BufTableLookup(&tag, hashcode);
if (buf_id >= 0) {
BufferDesc *buf = GetBufferDescriptor(buf_id);
@ -491,7 +488,10 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_
/* Pin the buffer to avoid invalidated by others */
bool valid = PinBuffer(buf, NULL);
LWLockRelease(partition_lock);
if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &tag)) {
UnpinBuffer(buf, true);
return InvalidBuffer;
}
if (!valid) {
UnpinBuffer(buf, true);
@ -500,7 +500,6 @@ Buffer try_get_moved_pagebuf(RelFileNode *rnode, int forknum, BlockNumber logic_
return BufferDescriptorGetBuffer(buf);
}
LWLockRelease(partition_lock);
return InvalidBuffer;
}