!445 parallel hash: increase hash buckets bug
Merge pull request !445 from 吴岳川/hash_bug
This commit is contained in:
@ -1135,7 +1135,7 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
|
||||
|
||||
/* process all tuples stored in this chunk (and then free it) */
|
||||
while (idx < oldchunks->used) {
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(oldchunks->data + idx);
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(HASH_CHUNK_DATA(oldchunks) + idx);
|
||||
MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
|
||||
int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
|
||||
int bucketno;
|
||||
@ -1411,7 +1411,7 @@ static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
|
||||
|
||||
/* Repartition all tuples in this chunk. */
|
||||
while (idx < chunk->used) {
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(HASH_CHUNK_DATA(chunk) + idx);
|
||||
MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
|
||||
HashJoinTuple copyTuple;
|
||||
HashJoinTuple shared;
|
||||
@ -1580,7 +1580,7 @@ static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
|
||||
while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s))) {
|
||||
size_t idx = 0;
|
||||
while (idx < chunk->used) {
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
|
||||
HashJoinTuple hashTuple = (HashJoinTuple)(HASH_CHUNK_DATA(chunk) + idx);
|
||||
int bucketno;
|
||||
int batchno;
|
||||
|
||||
@ -1588,7 +1588,7 @@ static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
|
||||
Assert(batchno == 0);
|
||||
|
||||
/* add the tuple to the proper bucket */
|
||||
ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple->next.shared);
|
||||
ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple);
|
||||
|
||||
/* advance index past the tuple */
|
||||
idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
|
||||
@ -2041,11 +2041,19 @@ bool ExecParallelScanHashBucket(HashJoinState* hjstate, ExprContext* econtext)
|
||||
/* reset temp memory each time to avoid leaks from qual expr */
|
||||
ResetExprContext(econtext);
|
||||
|
||||
if (ExecQual(hjclauses, econtext, false)) {
|
||||
if (ExecQual(hjclauses, econtext, false) ||
|
||||
(hjstate->js.nulleqqual != NIL && ExecQual(hjstate->js.nulleqqual, econtext, false))) {
|
||||
hjstate->hj_CurTuple = hashTuple;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
/*
|
||||
* For right Semi/Anti join, we delete mathced tuples in HashTable to make next matching faster,
|
||||
* so pointer hj_PreTuple is designed to follow the hj_CurTuple and to help us to clear the HashTable.
|
||||
*/
|
||||
if (hjstate->js.jointype == JOIN_RIGHT_SEMI || hjstate->js.jointype == JOIN_RIGHT_ANTI) {
|
||||
hjstate->hj_PreTuple = hashTuple;
|
||||
}
|
||||
|
||||
hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
|
||||
}
|
||||
@ -2716,10 +2724,10 @@ static void* dense_alloc(HashJoinTable hashtable, Size size)
|
||||
*/
|
||||
if (size > HASH_CHUNK_THRESHOLD) {
|
||||
/* allocate new chunk and put it at the beginning of the list */
|
||||
newChunk = (HashMemoryChunk)MemoryContextAlloc(hashtable->batchCxt, offsetof(HashMemoryChunkData, data) + size);
|
||||
newChunk = (HashMemoryChunk)MemoryContextAlloc(hashtable->batchCxt, HASH_CHUNK_HEADER_SIZE + size);
|
||||
newChunk->maxlen = size;
|
||||
newChunk->used = 0;
|
||||
newChunk->ntuples = 0;
|
||||
newChunk->used = size;
|
||||
newChunk->ntuples = 1;
|
||||
|
||||
/*
|
||||
* Add this chunk to the list after the first existing chunk, so that
|
||||
@ -2733,10 +2741,7 @@ static void* dense_alloc(HashJoinTable hashtable, Size size)
|
||||
hashtable->chunks = newChunk;
|
||||
}
|
||||
|
||||
newChunk->used += size;
|
||||
newChunk->ntuples += 1;
|
||||
|
||||
return newChunk->data;
|
||||
return HASH_CHUNK_DATA(newChunk);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2745,8 +2750,7 @@ static void* dense_alloc(HashJoinTable hashtable, Size size)
|
||||
*/
|
||||
if ((hashtable->chunks == NULL) || (hashtable->chunks->maxlen - hashtable->chunks->used) < size) {
|
||||
/* allocate new chunk and put it at the beginning of the list */
|
||||
newChunk = (HashMemoryChunk)MemoryContextAlloc(
|
||||
hashtable->batchCxt, offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
|
||||
newChunk = (HashMemoryChunk)MemoryContextAlloc(hashtable->batchCxt, HASH_CHUNK_HEADER_SIZE + HASH_CHUNK_SIZE);
|
||||
|
||||
newChunk->maxlen = HASH_CHUNK_SIZE;
|
||||
newChunk->used = size;
|
||||
@ -2755,11 +2759,11 @@ static void* dense_alloc(HashJoinTable hashtable, Size size)
|
||||
newChunk->next.unshared = hashtable->chunks;
|
||||
hashtable->chunks = newChunk;
|
||||
|
||||
return newChunk->data;
|
||||
return HASH_CHUNK_DATA(newChunk);
|
||||
}
|
||||
|
||||
/* There is enough space in the current chunk, let's add the tuple */
|
||||
ptr = hashtable->chunks->data + hashtable->chunks->used;
|
||||
ptr = HASH_CHUNK_DATA(hashtable->chunks) + hashtable->chunks->used;
|
||||
hashtable->chunks->used += size;
|
||||
hashtable->chunks->ntuples += 1;
|
||||
|
||||
@ -2796,7 +2800,7 @@ static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t
|
||||
if (chunk != NULL && size < HASH_CHUNK_THRESHOLD && chunk->maxlen - chunk->used >= size) {
|
||||
chunk_shared = (HashMemoryChunk)hashtable->current_chunk_shared;
|
||||
Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
|
||||
HashJoinTuple result = (HashJoinTuple)(chunk->data + chunk->used);
|
||||
HashJoinTuple result = (HashJoinTuple)(HASH_CHUNK_DATA(chunk) + chunk->used);
|
||||
*shared = result;
|
||||
chunk->used += size;
|
||||
Assert(chunk->used <= chunk->maxlen);
|
||||
@ -2875,7 +2879,7 @@ static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t
|
||||
chunk_shared = chunk_shared;
|
||||
chunk_shared->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
|
||||
chunk_shared->used = size;
|
||||
*shared = (HashJoinTuple)chunk_shared->data;
|
||||
*shared = (HashJoinTuple)HASH_CHUNK_DATA(chunk_shared);
|
||||
|
||||
/*
|
||||
* Push it onto the list of chunks, so that it can be found if we need to
|
||||
@ -2894,8 +2898,8 @@ static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t
|
||||
hashtable->current_chunk_shared = chunk_shared;
|
||||
}
|
||||
LWLockRelease(&pstate->lock);
|
||||
Assert(chunk_shared->data == (void*)dsa_get_address(hashtable->area, *shared));
|
||||
return (HashJoinTuple)chunk_shared->data;
|
||||
Assert(HASH_CHUNK_DATA(chunk_shared) == (void*)dsa_get_address(hashtable->area, *shared));
|
||||
return (HashJoinTuple)HASH_CHUNK_DATA(chunk_shared);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -3142,11 +3146,8 @@ void ExecHashTableDetach(HashJoinTable hashtable)
|
||||
*/
|
||||
static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
|
||||
{
|
||||
HashJoinTuple tuple;
|
||||
ParallelHashJoinState* parallelState = hashtable->parallel_state;
|
||||
Assert(parallelState);
|
||||
tuple = (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]);
|
||||
return tuple;
|
||||
Assert(hashtable->parallel_state);
|
||||
return (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]);
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user