必要时增加hashjoin的bucket数量

This commit is contained in:
wanghao19920907
2023-02-06 00:57:37 -08:00
parent 6179070ae2
commit c69efe24e4
7 changed files with 154 additions and 32 deletions

View File

@ -50,6 +50,7 @@
#include "workload/workload.h"
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash* node, int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable, TupleTableSlot* slot, uint32 hashvalue, int bucketNumber);
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
@ -128,6 +129,7 @@ Node* MultiExecHash(HashState* node)
if (bucketNumber != INVALID_SKEW_BUCKET_NO) {
/* It's a skew tuple, so put it into that hash table */
ExecHashSkewTableInsert(hashtable, slot, hashvalue, bucketNumber);
hashtable->skewTuples += 1;
} else {
/* Not subject to skew optimization, so insert normally */
ExecHashTableInsert(hashtable,
@ -142,13 +144,29 @@ Node* MultiExecHash(HashState* node)
}
(void)pgstat_report_waitstatus(oldStatus);
/* analyze hash table information for unique sql hash state */
UpdateUniqueSQLHashStats(hashtable, &start_time);
/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
if (hashtable->nbuckets != hashtable->nbuckets_optimal) {
/* We never decrease the number of buckets. */
Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
#ifdef HJDEBUG
printf("Increasing nbuckets %d => %d\n", hashtable->nbuckets, hashtable->nbuckets_optimal);
#endif
ExecHashIncreaseNumBuckets(hashtable);
}
/* analysis hash table information created in memory */
if (anls_opt_is_on(ANLS_HASH_CONFLICT))
ExecHashTableStats(hashtable, node->ps.plan->plan_node_id);
/* analyze hash table information for unique sql hash state */
UpdateUniqueSQLHashStats(hashtable, &start_time);
/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
/* must provide our own instrumentation support */
if (node->ps.instrument) {
@ -320,7 +338,10 @@ HashJoinTable ExecHashTableCreate(Hash* node, List* hashOperators, bool keepNull
*/
hashtable = (HashJoinTable)palloc(sizeof(HashJoinTableData));
hashtable->nbuckets = nbuckets;
hashtable->nbuckets_original = nbuckets;
hashtable->nbuckets_optimal = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->log2_nbuckets_optimal = log2_nbuckets;
hashtable->buckets = NULL;
hashtable->keepNulls = keepNulls;
hashtable->skewEnabled = false;
@ -334,6 +355,7 @@ HashJoinTable ExecHashTableCreate(Hash* node, List* hashOperators, bool keepNull
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
@ -999,6 +1021,18 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
*/
ninmemory = nfreed = 0;
/* If know we need to resize nbuckets, we can do it while rebatching. */
if (hashtable->nbuckets_optimal != hashtable->nbuckets) {
/* we never decrease the number of buckets */
Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
hashtable->buckets = (struct HashJoinTupleData**) repalloc(
hashtable->buckets, sizeof(HashJoinTuple) * hashtable->nbuckets);
}
/*
* We will scan through the chunks directly, so that we can reset the
* buckets now and not have to keep track which tuples in the buckets have
@ -1080,6 +1114,73 @@ static void ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
}
/*
* ExecHashIncreaseNumBuckets
* increase the original number of buckets in order to reduce
* number of tuples per bucket
*/
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
{
HashMemoryChunk chunk;
/* do nothing if not an increase (it's called increase for a reason) */
if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
return;
/*
* We already know the optimal number of buckets, so let's just
* compute the log2_nbuckets for it.
*/
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets_optimal);
Assert(hashtable->nbuckets > 1);
Assert(hashtable->nbuckets <= (INT_MAX / 2));
Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
#ifdef HJDEBUG
printf("Increasing nbuckets to %d\n", hashtable->nbuckets);
#endif
/*
* Just reallocate the proper number of buckets - we don't need to
* walk through them - we can walk the dense-allocated chunks
* (just like in ExecHashIncreaseNumBatches, but without all the
* copying into new chunks)
*/
hashtable->buckets = (HashJoinTuple *)repalloc(hashtable->buckets, hashtable->nbuckets * sizeof(HashJoinTuple));
memset_s(hashtable->buckets,
sizeof(void *) * hashtable->nbuckets,
0,
sizeof(void *) * hashtable->nbuckets);
/* scan through all tuples in all chunks to rebuild the hash table */
for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next) {
/* process all tuples stored in this chunk */
size_t idx = 0;
while (idx < chunk->used) {
HashJoinTuple hashTuple = (HashJoinTuple)(chunk->data + idx);
int bucketno;
int batchno;
ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, &bucketno, &batchno);
/* add the tuple to the proper bucket */
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
/* advance index past the tuple */
idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len);
}
}
#ifdef HJDEBUG
printf("Nbuckets increased to %d, average items per bucket %.1f\n", hashtable->nbuckets,
batchTuples / hashtable->nbuckets);
#endif
}
/*
* ExecHashTableInsert
* insert a tuple into the hash table depending on the hash value
@ -1110,6 +1211,7 @@ void ExecHashTableInsert(
*/
HashJoinTuple hashTuple;
int hashTupleSize;
double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
/* Create the HashJoinTuple */
hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
@ -1136,13 +1238,24 @@ void ExecHashTableInsert(
hashtable->width[1] += tuple->t_len;
}
/*
* Increase the (optimal) number of buckets if we just exceeded the
* NTUP_PER_BUCKET threshold, but only when there's still a single batch.
*/
if ((hashtable->nbatch == 1) && (hashtable->nbuckets_optimal <= INT_MAX / 2) && /* overflow protection */
(ntuples >= (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))) {
hashtable->nbuckets_optimal *= 2;
hashtable->log2_nbuckets_optimal += 1;
}
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
if (hashtable->spaceUsed > hashtable->spacePeak) {
hashtable->spacePeak = hashtable->spaceUsed;
}
bool sysBusy = gs_sysmemory_busy(hashtable->spaceUsed * dop, false);
if (hashtable->spaceUsed > hashtable->spaceAllowed || sysBusy) {
if (hashtable->spaceUsed + int64(hashtable->nbuckets_optimal * sizeof(HashJoinTuple)) > hashtable->spaceAllowed
|| sysBusy) {
AllocSetContext* set = (AllocSetContext*)(hashtable->hashCxt);
if (sysBusy) {
hashtable->causedBySysRes = true;
@ -1309,7 +1422,10 @@ bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext* econtext, List*
* functions are good about randomizing all their output bits, else we are
* likely to have very skewed bucket or batch occupancy.)
*
* nbuckets doesn't change over the course of the join.
* nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic
* bucket count growth. Once we start batching, the value is fixed and does
* not change over the course of the join (making it possible to compute batch
* number the way we do here).
*
* nbatch is always a power of 2; we increase it only by doubling it. This
* effectively adds one more bit to the top of the batchno.