From c85182e24cbbb6e0f7fb6ddee183e89071ac01ac Mon Sep 17 00:00:00 2001 From: cc_db_dev Date: Thu, 23 Feb 2023 19:50:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E5=8D=87external=20sort=E6=80=A7?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/utils/sort/logtape.cpp | 25 + src/common/backend/utils/sort/tuplesort.cpp | 740 +++++++++++------- .../optimizer/commands/cluster.cpp | 22 +- .../storage/access/hash/hashsort.cpp | 5 +- .../storage/access/nbtree/nbtsort.cpp | 24 +- .../storage/access/ubtree/ubtsort.cpp | 24 +- src/include/utils/logtape.h | 1 + src/include/utils/tuplesort.h | 4 +- src/test/regress/pg_regress.cpp | 2 +- 9 files changed, 485 insertions(+), 362 deletions(-) diff --git a/src/common/backend/utils/sort/logtape.cpp b/src/common/backend/utils/sort/logtape.cpp index a4365d17e..4709d0b88 100644 --- a/src/common/backend/utils/sort/logtape.cpp +++ b/src/common/backend/utils/sort/logtape.cpp @@ -144,6 +144,7 @@ typedef struct LogicalTape { int max_size; /* highest useful, safe buffer_size */ int pos; /* next read/write position in buffer */ int nbytes; /* total # of valid bytes in buffer */ + int read_buffer_size; } LogicalTape; /* @@ -979,3 +980,27 @@ long LogicalTapeSetBlocks(LogicalTapeSet *lts) { return lts->nBlocksAllocated - lts->nHoleBlocks; } + +/* + * Set buffer size to use, when reading from given tape. + */ +void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem) +{ + LogicalTape *lt; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = <s->tapes[tapenum]; + + /* + * The buffer size must be a multiple of BLCKSZ in size, so round the + * given value down to nearest BLCKSZ. Make sure we have at least one + * page. Also, don't go above MaxAllocSize, to avoid erroring out. A + * multi-gigabyte buffer is unlikely to be helpful, anyway. + */ + if (avail_mem < BLCKSZ) + avail_mem = BLCKSZ; + if (avail_mem > MaxAllocSize) + avail_mem = MaxAllocSize; + avail_mem -= avail_mem % BLCKSZ; + lt->read_buffer_size = avail_mem; +} diff --git a/src/common/backend/utils/sort/tuplesort.cpp b/src/common/backend/utils/sort/tuplesort.cpp index 3d3d2740c..f40ea41b6 100644 --- a/src/common/backend/utils/sort/tuplesort.cpp +++ b/src/common/backend/utils/sort/tuplesort.cpp @@ -195,6 +195,14 @@ typedef struct { int tupindex; /* see notes above */ } SortTuple; +#define SLAB_SLOT_SIZE 1024 + +typedef union SlabSlot +{ + union SlabSlot *nextfree; + char buffer[SLAB_SLOT_SIZE]; +} SlabSlot; + /* * Possible states of a Tuplesort object. These denote the states that * persist between calls of Tuplesort routines. @@ -236,11 +244,13 @@ struct Tuplesortstate { * tuples to return? */ bool boundUsed; /* true if we made use of a bounded heap */ int bound; /* if bounded, the maximum number of tuples */ + bool tuples; /* Can SortTuple.tuple ever be set? */ int64 availMem; /* remaining memory available, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */ int maxTapes; /* number of tapes (Knuth's T) */ int tapeRange; /* maxTapes-1 (Knuth's P) */ MemoryContext sortcontext; /* memory context holding all sort data */ + MemoryContext tuplecontext; /* memory context holding tuple data */ LogicalTapeSet* tapeset; /* logtape.c object for tapes in a temp file */ #ifdef PGXC Oid current_xcnode; /* node from where we are got last tuple */ @@ -304,6 +314,47 @@ struct Tuplesortstate { int memtupsize; /* allocated length of memtuples array */ bool growmemtuples; /* memtuples' growth still underway? */ + /* + * Memory for tuples is sometimes allocated using a simple slab allocator, + * rather than with palloc(). Currently, we switch to slab allocation + * when we start merging. Merging only needs to keep a small, fixed + * number of tuples in memory at any time, so we can avoid the + * palloc/pfree overhead by recycling a fixed number of fixed-size slots + * to hold the tuples. + * + * For the slab, we use one large allocation, divided into SLAB_SLOT_SIZE + * slots. The allocation is sized to have one slot per tape, plus one + * additional slot. We need that many slots to hold all the tuples kept + * in the heap during merge, plus the one we have last returned from the + * sort, with tuplesort_gettuple. + * + * Initially, all the slots are kept in a linked list of free slots. When + * a tuple is read from a tape, it is put to the next available slot, if + * it fits. If the tuple is larger than SLAB_SLOT_SIZE, it is palloc'd + * instead. + * + * When we're done processing a tuple, we return the slot back to the free + * list, or pfree() if it was palloc'd. We know that a tuple was + * allocated from the slab, if its pointer value is between + * slabMemoryBegin and -End. + * + * When the slab allocator is used, the USEMEM/LACKMEM mechanism of + * tracking memory usage is not used. + */ + bool slabAllocatorUsed; + + char *slabMemoryBegin; /* beginning of slab memory arena */ + char *slabMemoryEnd; /* end of slab memory arena */ + SlabSlot *slabFreeHead; /* head of free list */ + + /* + * When we return a tuple to the caller in tuplesort_gettuple_XXX, that + * came from a tape (that is, in TSS_SORTEDONTAPE or TSS_FINALMERGE + * modes), we remember the tuple in 'lastReturnedTuple', so that we can + * recycle the memory on next gettuple call. + */ + void *lastReturnedTuple; + /* * While building initial runs, this is the current output run number * (starting at 0). Afterwards, it is the number of initial runs we made. @@ -331,12 +382,6 @@ struct Tuplesortstate { * memtuples[0] is part of the merge heap and is never a pre-read tuple. */ bool* mergeactive; /* active input run source? */ - int* mergenext; /* first preread tuple for each source */ - int* mergelast; /* last preread tuple for each source */ - int* mergeavailslots; /* slots left for prereading each tape */ - long* mergeavailmem; /* availMem for prereading each tape */ - int mergefreelist; /* head of freelist of recycled slots */ - int mergefirstfree; /* first slot never used in this merge */ /* * Variables for Algorithm D. Note that destTape is a "logical" tape @@ -436,7 +481,6 @@ struct Tuplesortstate { Oid datumType; /* we need typelen and byval in order to know how to copy the Datums. */ int datumTypeLen; - bool datumTypeByVal; // merge sort in remotequery RemoteQueryState* combiner; /* tuple source, alternate to tapeset */ @@ -467,6 +511,29 @@ struct Tuplesortstate { uint64 spill_count; /* the times of spilling to disk */ }; +/* + * Is the given tuple allocated from the slab memory arena? + */ +#define IS_SLAB_SLOT(state, tuple) \ + ((char *) (tuple) >= (state)->slabMemoryBegin && \ + (char *) (tuple) < (state)->slabMemoryEnd) + +/* + * Return the given tuple to the slab memory free list, or free it + * if it was palloc'd. + */ +#define RELEASE_SLAB_SLOT(state, tuple) \ + do { \ + SlabSlot *buf = (SlabSlot *) tuple; \ + \ + if (IS_SLAB_SLOT((state), buf)) \ + { \ + buf->nextfree = (state)->slabFreeHead; \ + (state)->slabFreeHead = buf; \ + } else \ + pfree(buf); \ + } while(0) + #define COMPARETUP(state, a, b) ((*(state)->comparetup)(a, b, state)) #define COPYTUP(state, stup, tup) ((*(state)->copytup)(state, stup, tup)) #define WRITETUP(state, tape, stup) ((*(state)->writetup)(state, tape, stup)) @@ -486,7 +553,8 @@ static bool LACKMEM(Tuplesortstate* state) { int64 usedMem = state->allowedMem - state->availMem; - if (state->availMem < 0 || gs_sysmemory_busy(usedMem * state->dop, true)) + if ((state->availMem < 0 && !state->slabAllocatorUsed) || + gs_sysmemory_busy(usedMem * state->dop, true)) return true; return false; @@ -585,7 +653,13 @@ static bool AutoSpreadMem(Tuplesortstate* state, double* growRatio) * rather than the originally-requested size. This is important since * palloc can add substantial overhead. It's not a complete answer since * we won't count any wasted space in palloc allocation blocks, but it's - * a lot better than what we were doing before 7.3. + * a lot better than what we were doing before 7.3. As of 9.6, a + * separate memory context is used for caller passed tuples. Resetting + * it at certain key increments significantly ameliorates fragmentation. + * Note that this places a responsibility on readtup and copytup routines + * to use the right memory context for these tuples (and to not use the + * reset context for anything whose lifetime needs to span multiple + * external sort runs). */ /* When using this macro, beware of double evaluation of len */ @@ -602,11 +676,12 @@ static bool consider_abort_common(Tuplesortstate* state); static void inittapes(Tuplesortstate* state, bool mergeruns); static void inittapestate(Tuplesortstate *state, int maxTapes); static void selectnewtape(Tuplesortstate* state); +static void init_slab_allocator(Tuplesortstate *state, int numSlots); +static void init_tape_buffers(Tuplesortstate *state, int numInputTapes); static void mergeruns(Tuplesortstate* state); static void mergeonerun(Tuplesortstate* state); -static void beginmerge(Tuplesortstate* state); -static void mergepreread(Tuplesortstate* state); -static void mergeprereadone(Tuplesortstate* state, int srcTape); +static void beginmerge(Tuplesortstate *state); +static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); static void dumptuples(Tuplesortstate* state, bool alltuples); static void make_bounded_heap(Tuplesortstate* state); static void sort_bounded_heap(Tuplesortstate* state); @@ -615,6 +690,7 @@ static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_delete_top(Tuplesortstate* state); static unsigned int getlen(Tuplesortstate* state, int tapenum, bool eofOK); static void markrunend(Tuplesortstate* state, int tapenum); +static void *readtup_alloc(Tuplesortstate *state, Size tuplen); static int comparetup_heap(const SortTuple* a, const SortTuple* b, Tuplesortstate* state); static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup); static void writetup_heap(Tuplesortstate* state, int tapenum, SortTuple* stup); @@ -697,6 +773,7 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess, { Tuplesortstate* state = NULL; MemoryContext sortcontext; + MemoryContext tuplecontext; MemoryContext oldcontext; /* See leader_takeover_tapes() remarks on randomAccess support */ @@ -707,8 +784,16 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess, * Create a working memory context for this sort operation. All data * needed by the sort will live inside this context. */ - sortcontext = AllocSetContextCreate(CurrentMemoryContext, "TupleSort", ALLOCSET_DEFAULT_MINSIZE, + sortcontext = AllocSetContextCreate(CurrentMemoryContext, "TupleSort main", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE, STANDARD_CONTEXT, workMem * 1024L); + + tuplecontext = AllocSetContextCreate(sortcontext, + "Caller tuples", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE, + STANDARD_CONTEXT, + workMem * 1024L); /* * Make the Tuplesortstate within the per-sort context. This way, we @@ -726,15 +811,18 @@ static Tuplesortstate* tuplesort_begin_common(int64 workMem, bool randomAccess, state->status = TSS_INITIAL; state->randomAccess = randomAccess; state->bounded = false; + state->tuples = true; state->boundUsed = false; state->allowedMem = Max(workMem, 64) * (int64) 1024; state->availMem = state->allowedMem; state->sortcontext = sortcontext; + state->tuplecontext = tuplecontext; state->tapeset = NULL; state->memtupcount = 0; state->memtupsize = 1024; /* initial guess */ state->growmemtuples = true; + state->slabAllocatorUsed = false; state->memtuples = (SortTuple*)palloc(state->memtupsize * sizeof(SortTuple)); USEMEM(state, GetMemoryChunkSpace(state->memtuples)); @@ -1066,7 +1154,7 @@ Tuplesortstate* tuplesort_begin_datum( /* lookup necessary attributes of the datum type */ get_typlenbyval(datumType, &typlen, &typbyval); state->datumTypeLen = typlen; - state->datumTypeByVal = typbyval; + state->tuples = !typbyval; (void)MemoryContextSwitchTo(oldcontext); @@ -1448,7 +1536,7 @@ void TuplesortPutheaptuple(Tuplesortstate* state, HeapTuple tup) void tuplesort_putindextuplevalues( Tuplesortstate* state, Relation rel, ItemPointer self, Datum* values, const bool* isnull) { - MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; stup.tupindex = 0; stup.tuple = index_form_tuple(RelationGetDescr(rel), values, isnull); @@ -1457,6 +1545,8 @@ void tuplesort_putindextuplevalues( USEMEM(state, GetMemoryChunkSpace(stup.tuple)); /* set up first-column key value */ stup.datum1 = index_getattr((IndexTuple)stup.tuple, 1, RelationGetDescr(state->indexRel), &stup.isnull1); + + MemoryContextSwitchTo(state->sortcontext); puttuple_common(state, &stup); (void)MemoryContextSwitchTo(oldcontext); @@ -1469,7 +1559,7 @@ void tuplesort_putindextuplevalues( */ void tuplesort_putdatum(Tuplesortstate* state, Datum val, bool isNull) { - MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); SortTuple stup; stup.tupindex = 0; @@ -1477,15 +1567,17 @@ void tuplesort_putdatum(Tuplesortstate* state, Datum val, bool isNull) * If it's a pass-by-reference value, copy it into memory we control, and * decrease availMem. Then call the common code. */ - if (isNull || state->datumTypeByVal) { + if (isNull || !state->tuples) { stup.datum1 = val; stup.isnull1 = isNull; stup.tuple = NULL; /* no separate storage */ + MemoryContextSwitchTo(state->sortcontext); } else { stup.datum1 = datumCopy(val, false, state->datumTypeLen); stup.isnull1 = false; stup.tuple = DatumGetPointer(stup.datum1); USEMEM(state, GetMemoryChunkSpace(stup.tuple)); + MemoryContextSwitchTo(state->sortcontext); } puttuple_common(state, &stup); @@ -1802,9 +1894,10 @@ void tuplesort_performsort(Tuplesortstate* state) /* * Internal routine to fetch the next tuple in either forward or back * direction into *stup. Returns FALSE if no more tuples. - * If *should_free is set, the caller must pfree stup.tuple when done with it. + * Returned tuple belongs to tuplesort memory context, and must not be freed + * by caller. Caller should not use tuple following next call here. */ -static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortTuple* stup, bool* should_free) +static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortTuple* stup) { unsigned int tuplen; @@ -1813,7 +1906,7 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT switch (state->status) { case TSS_SORTEDINMEM: Assert(forward || state->randomAccess); - *should_free = false; + Assert(!state->slabAllocatorUsed); if (forward) { if (state->current < state->memtupcount) { *stup = state->memtuples[state->current++]; @@ -1858,7 +1951,17 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); - *should_free = true; + Assert(state->slabAllocatorUsed); + + /* + * The slot that held the tuple that we returned in previous + * gettuple call can now be reused. + */ + if (state->lastReturnedTuple) { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } + if (forward) { if (state->eof_reached) { return false; @@ -1866,6 +1969,7 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT if ((tuplen = getlen(state, state->result_tape, true)) != 0) { READTUP(state, stup, state->result_tape, tuplen); + state->lastReturnedTuple = stup->tuple; return true; } else { state->eof_reached = true; @@ -1926,60 +2030,54 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_FILE_READ_FAILED), errmsg("bogus tuple length in backward scan")))); READTUP(state, stup, state->result_tape, tuplen); + state->lastReturnedTuple = stup->tuple; return true; case TSS_FINALMERGE: Assert(forward); - *should_free = true; + Assert(state->slabAllocatorUsed); + + if (state->lastReturnedTuple) { + RELEASE_SLAB_SLOT(state, state->lastReturnedTuple); + state->lastReturnedTuple = NULL; + } /* * This code should match the inner loop of mergeonerun(). */ if (state->memtupcount > 0) { int srcTape = state->memtuples[0].tupindex; - Size tuplength; - int tupIndex; - SortTuple* newtup = NULL; - + SortTuple newtup; *stup = state->memtuples[0]; - /* returned tuple is no longer counted in our memory space */ - if (stup->tuple != NULL) { - tuplength = GetMemoryChunkSpace(stup->tuple); - state->availMem += tuplength; - state->mergeavailmem[srcTape] += tuplength; - } - if ((tupIndex = state->mergenext[srcTape]) == 0) { + + /* + * Remember the tuple we return, so that we can recycle its + * memory on next call. (This can be NULL, in the Datum case). + */ + state->lastReturnedTuple = stup->tuple; + + + /* + * Pull next tuple from tape, and replace the returned tuple + * at top of the heap with it. + */ + if (!mergereadnext(state, srcTape, &newtup)) { /* - * out of preloaded data on this tape, try to read more - * - * Unlike mergeonerun(), we only preload from the single - * tape that's run dry. See mergepreread() comments. + * If no more data, we've reached end of run on this tape. + * Remove the top node from the heap. */ - mergeprereadone(state, srcTape); + tuplesort_heap_delete_top(state); /* - * if still no data, we've reached end of run on this tape + * Rewind to free the read buffer. It'd go away at the + * end of the sort anyway, but better to release the + * memory early. */ - if ((tupIndex = state->mergenext[srcTape]) == 0) { - /* Remove the top node from the heap */ - tuplesort_heap_delete_top(state); - return true; - } + LogicalTapeRewindForWrite(state->tapeset, srcTape); + return true; } - /* - * pull next preread tuple from list, and replace the returned - * tuple at top of the heap with it. - */ - newtup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = newtup->tupindex; - if (state->mergenext[srcTape] == 0) - state->mergelast[srcTape] = 0; - newtup->tupindex = srcTape; - tuplesort_heap_replace_top(state, newtup); - /* put the now-unused memtuples entry on the freelist */ - newtup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; + newtup.tupindex = srcTape; + tuplesort_heap_replace_top(state, &newtup); return true; } return false; @@ -2007,9 +2105,8 @@ bool tuplesort_gettupleslot(Tuplesortstate* state, bool forward, TupleTableSlot* { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; - bool should_free = false; - if (!tuplesort_gettuple_common(state, forward, &stup, &should_free)) + if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; (void)MemoryContextSwitchTo(oldcontext); @@ -2019,7 +2116,7 @@ bool tuplesort_gettupleslot(Tuplesortstate* state, bool forward, TupleTableSlot* if (state->sortKeys->abbrev_converter && abbrev) *abbrev = stup.datum1; - ExecStoreMinimalTuple((MinimalTuple)stup.tuple, slot, should_free); + ExecStoreMinimalTuple((MinimalTuple)stup.tuple, slot, false); return true; } else { (void)ExecClearTuple(slot); @@ -2042,11 +2139,10 @@ bool tuplesort_gettupleslot_into_tuplestore( { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; - bool should_free = false; Assert(tstate != NULL); - if (!tuplesort_gettuple_common(state, forward, &stup, &should_free)) + if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; (void)MemoryContextSwitchTo(oldcontext); @@ -2056,7 +2152,7 @@ bool tuplesort_gettupleslot_into_tuplestore( if (state->sortKeys->abbrev_converter && abbrev) *abbrev = stup.datum1; - ExecStoreMinimalTuple((MinimalTuple)stup.tuple, slot, should_free); + ExecStoreMinimalTuple((MinimalTuple)stup.tuple, slot, false); /* tuple in tuplesort will be cleared immediately, so we put it into tuplestore too, to let it be saved */ tuplestore_puttupleslot(tstate, slot); @@ -2073,12 +2169,12 @@ bool tuplesort_gettupleslot_into_tuplestore( * Returns NULL if no more tuples. If *should_free is set, the * caller must pfree the returned tuple when done with it. */ -void* tuplesort_getheaptuple(Tuplesortstate* state, bool forward, bool* should_free) +void* tuplesort_getheaptuple(Tuplesortstate* state, bool forward) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; - if (!tuplesort_gettuple_common(state, forward, &stup, should_free)) + if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; (void)MemoryContextSwitchTo(oldcontext); @@ -2091,12 +2187,12 @@ void* tuplesort_getheaptuple(Tuplesortstate* state, bool forward, bool* should_f * Returns NULL if no more tuples. If *should_free is set, the * caller must pfree the returned tuple when done with it. */ -IndexTuple tuplesort_getindextuple(Tuplesortstate* state, bool forward, bool* should_free) +IndexTuple tuplesort_getindextuple(Tuplesortstate* state, bool forward) { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; - if (!tuplesort_gettuple_common(state, forward, &stup, should_free)) + if (!tuplesort_gettuple_common(state, forward, &stup)) stup.tuple = NULL; (void)MemoryContextSwitchTo(oldcontext); @@ -2115,21 +2211,17 @@ bool tuplesort_getdatum(Tuplesortstate* state, bool forward, Datum* val, bool* i { MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); SortTuple stup; - bool should_free = false; - if (!tuplesort_gettuple_common(state, forward, &stup, &should_free)) { + if (!tuplesort_gettuple_common(state, forward, &stup)) { (void)MemoryContextSwitchTo(oldcontext); return false; } - if (stup.isnull1 || state->datumTypeByVal) { + if (stup.isnull1 || !state->tuples) { *val = stup.datum1; *isNull = stup.isnull1; } else { - if (should_free) - *val = stup.datum1; - else - *val = datumCopy(stup.datum1, false, state->datumTypeLen); + *val = datumCopy(stup.datum1, false, state->datumTypeLen); *isNull = false; } @@ -2161,16 +2253,12 @@ bool tuplesort_skiptuples(Tuplesortstate* state, int64 ntuples, bool forward) oldcontext = MemoryContextSwitchTo(state->sortcontext); for (int i = 0; i < ntuples; i++) { SortTuple stup; - bool should_free = false; stup.tuple = NULL; - if (!tuplesort_gettuple_common(state, forward, &stup, &should_free)) { + if (!tuplesort_gettuple_common(state, forward, &stup)) { (void)MemoryContextSwitchTo(oldcontext); return false; } - /* stup.tuple may be null there */ - if (should_free && stup.tuple) - pfree(stup.tuple); /* allowed to be canceld */ CHECK_FOR_INTERRUPTS(); } @@ -2244,13 +2332,6 @@ static void inittapes(Tuplesortstate* state, bool mergeruns) maxTapes = MINORDER + 1; } - /* - * We must have at least 2*maxTapes slots in the memtuples[] array, else - * we'd not have room for merge heap plus preread. It seems unlikely that - * this case would ever occur, but be safe. - */ - maxTapes = Min(maxTapes, state->memtupsize / 2); - #ifdef TRACE_SORT if (u_sess->attr.attr_common.trace_sort) { elog(LOG, "%d switching to external sort with %d tapes: %s", @@ -2310,10 +2391,6 @@ static void inittapestate(Tuplesortstate *state, int maxTapes) PrepareTempTablespaces(); state->mergeactive = (bool *)palloc0(maxTapes * sizeof(bool)); - state->mergenext = (int*)palloc0(maxTapes * sizeof(int)); - state->mergelast = (int*)palloc0(maxTapes * sizeof(int)); - state->mergeavailslots = (int*)palloc0(maxTapes * sizeof(int)); - state->mergeavailmem = (long*)palloc0(maxTapes * sizeof(long)); state->tp_fib = (int *)palloc0(maxTapes * sizeof(int)); state->tp_runs = (int *)palloc0(maxTapes * sizeof(int)); state->tp_dummy = (int *)palloc0(maxTapes * sizeof(int)); @@ -2357,6 +2434,97 @@ static void selectnewtape(Tuplesortstate* state) state->destTape = 0; } +/* + * Initialize the slab allocation arena, for the given number of slots. + */ +static void +init_slab_allocator(Tuplesortstate *state, int numSlots) +{ + if (numSlots > 0) { + char *p; + int i; + + state->slabMemoryBegin = (char *) palloc(numSlots * SLAB_SLOT_SIZE); + state->slabMemoryEnd = state->slabMemoryBegin + numSlots * SLAB_SLOT_SIZE; + state->slabFreeHead = (SlabSlot *) state->slabMemoryBegin; + USEMEM(state, numSlots * SLAB_SLOT_SIZE); + + p = state->slabMemoryBegin; + for (i = 0; i < numSlots - 1; i++) { + ((SlabSlot *) p)->nextfree = (SlabSlot *) (p + SLAB_SLOT_SIZE); + p += SLAB_SLOT_SIZE; + } + ((SlabSlot *) p)->nextfree = NULL; + } else { + state->slabMemoryBegin = state->slabMemoryEnd = NULL; + state->slabFreeHead = NULL; + } + state->slabAllocatorUsed = true; +} + +/* + * Divide all remaining work memory (availMem) as read buffers, for all + * the tapes that will be used during the merge. + * + * We use the number of possible *input* tapes here, rather than maxTapes, + * for the calculation. At all times, we'll be reading from at most + * numInputTapes tapes, and one tape is used for output (unless we do an + * on-the-fly final merge, in which case we don't have an output tape). + */ +static void +init_tape_buffers(Tuplesortstate *state, int numInputTapes) +{ + int64 availBlocks; + int64 blocksPerTape; + int remainder; + int tapenum; + + /* + * Divide availMem evenly among the number of input tapes. + */ + availBlocks = state->availMem / BLCKSZ; + blocksPerTape = availBlocks / numInputTapes; + remainder = availBlocks % numInputTapes; + USEMEM(state, availBlocks * BLCKSZ); + +#ifdef TRACE_SORT + if (u_sess->attr.attr_common.trace_sort) + elog(LOG, "using " INT64_FORMAT " KB of memory for read buffers among %d input tapes", + (availBlocks * BLCKSZ) / 1024, numInputTapes); +#endif + + /* + * Use one page per tape, even if we are out of memory. + * tuplesort_merge_order() should've chosen the number of tapes so that + * this can't happen, but better safe than sorry. (This also protects + * from a negative availMem.) + */ + if (blocksPerTape < 1) { + blocksPerTape = 1; + remainder = 0; + } + + /* + * Set the buffers for the tapes. + * + * In a multi-phase merge, the tape that is initially used as an output + * tape, will later be rewound and read from, and should also use a large + * buffer at that point. So we must loop up to maxTapes, not just + * numInputTapes! + * + * If there are fewer runs than tapes, we will set the buffer size also + * for tapes that will go completely unused, but that's harmless. + * LogicalTapeAssignReadBufferSize() doesn't allocate the buffer + * immediately, it just sets the size that will be used, when the tape is + * rewound for read, and the tape isn't empty. + */ + for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { + int64 numBlocks = blocksPerTape + (tapenum < remainder ? 1 : 0); + + LogicalTapeAssignReadBufferSize(state->tapeset, tapenum, numBlocks * BLCKSZ); + } +} + static void mergeruns_tapefreeze(Tuplesortstate* state) { if (!WORKER(state)) { @@ -2375,6 +2543,8 @@ static void mergeruns_tapefreeze(Tuplesortstate* state) static void mergeruns(Tuplesortstate* state) { int tapenum, svTape, svRuns, svDummy; + int numTapes; + int numInputTapes; Assert(state->status == TSS_BUILDRUNS); Assert(state->memtupcount == 0); @@ -2408,6 +2578,82 @@ static void mergeruns(Tuplesortstate* state) state->sortKeys->abbrev_full_comparator = NULL; } + /* + * Reset tuple memory. We've freed all the tuples that we previously + * allocated. We will use the slab allocator from now on. + */ + MemoryContextDelete(state->tuplecontext); + state->tuplecontext = NULL; + + /* + * We no longer need a large memtuples array. (We will allocate a smaller + * one for the heap later.) + */ + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + pfree(state->memtuples); + state->memtuples = NULL; + + /* + * If we had fewer runs than tapes, refund the memory that we imagined we + * would need for the tape buffers of the unused tapes. + * + * numTapes and numInputTapes reflect the actual number of tapes we will + * use. Note that the output tape's tape number is maxTapes - 1, so the + * tape numbers of the used tapes are not consecutive, and you cannot just + * loop from 0 to numTapes to visit all used tapes! + */ + if (state->Level == 1) { + numInputTapes = state->currentRun; + numTapes = numInputTapes + 1; + FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD); + } else { + numInputTapes = state->tapeRange; + numTapes = state->maxTapes; + } + + /* + * Initialize the slab allocator. We need one slab slot per input tape, + * for the tuples in the heap, plus one to hold the tuple last returned + * from tuplesort_gettuple. (If we're sorting pass-by-val Datums, + * however, we don't need to do allocate anything.) + * + * From this point on, we no longer use the USEMEM()/LACKMEM() mechanism + * to track memory usage of individual tuples. + */ + if (state->tuples) + init_slab_allocator(state, numInputTapes + 1); + else + init_slab_allocator(state, 0); + + /* + * Use all the spare memory we have available for read buffers for the + * tapes. + * + * We do this only after checking for the case that we produced only one + * initial run, because there is no need to use a large read buffer when + * we're reading from a single tape. With one tape, the I/O pattern will + * be the same regardless of the buffer size. + * + * We don't try to "rebalance" the amount of memory among tapes, when we + * start a new merge phase, even if some tapes can be inactive in the + * phase. That would be hard, because logtape.c doesn't know where one + * run ends and another begins. When a new merge phase begins, and a tape + * doesn't participate in it, its buffer nevertheless already contains + * tuples from the next run on same tape, so we cannot release the buffer. + * That's OK in practice, merge performance isn't that sensitive to the + * amount of buffers used, and most merge phases use all or almost all + * tapes, anyway. + */ + init_tape_buffers(state, numInputTapes); + + /* + * Allocate a new 'memtuples' array, for the heap. It will hold one tuple + * from each input tape. + */ + state->memtupsize = numInputTapes; + state->memtuples = (SortTuple *) palloc(numInputTapes * sizeof(SortTuple)); + USEMEM(state, GetMemoryChunkSpace(state->memtuples)); + /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) LogicalTapeRewindForRead(state->tapeset, tapenum, BLCKSZ); @@ -2496,6 +2742,11 @@ static void mergeruns(Tuplesortstate* state) state->result_tape = state->tp_tapenum[state->tapeRange]; mergeruns_tapefreeze(state); state->status = TSS_SORTEDONTAPE; + + for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { + if (tapenum != state->result_tape) + LogicalTapeRewindForWrite(state->tapeset, tapenum); + } } /* @@ -2508,9 +2759,6 @@ static void mergeonerun(Tuplesortstate* state) { int destTape = state->tp_tapenum[state->tapeRange]; int srcTape; - int tupIndex; - SortTuple* tup = NULL; - long priorAvail, spaceFreed; /* * Start the merge by loading one tuple from each active source tape into @@ -2524,38 +2772,26 @@ static void mergeonerun(Tuplesortstate* state) * another one). */ while (state->memtupcount > 0) { + SortTuple stup; /* write the tuple to destTape */ - priorAvail = state->availMem; srcTape = state->memtuples[0].tupindex; WRITETUP(state, destTape, &state->memtuples[0]); - /* writetup adjusted total free space, now fix per-tape space */ - spaceFreed = state->availMem - priorAvail; - state->mergeavailmem[srcTape] += spaceFreed; - if ((tupIndex = state->mergenext[srcTape]) == 0) { - /* out of preloaded data on this tape, try to read more */ - mergepreread(state); - /* if still no data, we've reached end of run on this tape */ - if ((tupIndex = state->mergenext[srcTape]) == 0) { - /* remove the written-out tuple from the heap */ - tuplesort_heap_delete_top(state); - continue; - } + + /* recycle the slot of the tuple we just wrote out, for the next read */ + if (state->memtuples[0].tuple) { + RELEASE_SLAB_SLOT(state, state->memtuples[0].tuple); } + /* - * pull next preread tuple from list, and replace the written-out - * tuple in the heap with it. + * pull next tuple from the tape, and replace the written-out tuple in + * the heap with it. */ - tup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = tup->tupindex; - if (state->mergenext[srcTape] == 0) { - state->mergelast[srcTape] = 0; + if (mergereadnext(state, srcTape, &stup)) { + stup.tupindex = srcTape; + tuplesort_heap_replace_top(state, &stup); + } else { + tuplesort_heap_delete_top(state); } - tup->tupindex = srcTape; - tuplesort_heap_replace_top(state, tup); - /* put the now-unused memtuples entry on the freelist */ - tup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; } /* @@ -2577,17 +2813,18 @@ static void mergeonerun(Tuplesortstate* state) * beginmerge - initialize for a merge pass * * We decrease the counts of real and dummy runs for each tape, and mark - * which tapes contain active input runs in mergeactive[]. Then, load + * which tapes contain active input runs in mergeactive[]. Then, load * as many tuples as we can from each active input tape, and finally * fill the merge heap with the first tuple from each active tape. + * + * finalMergeBatch indicates if this is the beginning of a final on-the-fly + * merge where a batched allocation of tuple memory is required. */ -static void beginmerge(Tuplesortstate* state) +static void beginmerge(Tuplesortstate *state) { int activeTapes; int tapenum; int srcTape; - int slotsPerTape; - long spacePerTape; /* Heap should be empty here */ Assert(state->memtupcount == 0); @@ -2611,188 +2848,52 @@ static void beginmerge(Tuplesortstate* state) activeTapes++; } } - state->activeTapes = activeTapes; - /* Clear merge-pass state variables */ - rc = memset_s( - state->mergenext, state->maxTapes * sizeof(*state->mergenext), 0, state->maxTapes * sizeof(*state->mergenext)); - securec_check(rc, "\0", "\0"); - rc = memset_s( - state->mergelast, state->maxTapes * sizeof(*state->mergelast), 0, state->maxTapes * sizeof(*state->mergelast)); - securec_check(rc, "\0", "\0"); - state->mergefreelist = 0; /* nothing in the freelist */ - state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ - - /* - * Initialize space allocation to let each active input tape have an equal - * share of preread space. For cluster environment, the memtupsize initial - * value is 1024, when DN num is very large, slotsPerTape maybe less than 1. - * For example, DN num is 720, slotsPerTape = (1024 - 720) / 720 = 0. - * So when slotsPerTape less than 1, we set slotsPerTape to MINIMAL_SLOTS_PER_TAPE, - * we also must change memtupsize and memtuples. - */ if (activeTapes <= 0) { ereport(ERROR, (errmodule(MOD_EXECUTOR), (errcode(ERRCODE_CHECK_VIOLATION), errmsg("ActiveTapes should be larger than zero.")))); } - slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; - - // If this is coordinator node or stream merge sort, and slotsPerTape less than 1, - // change the slotsPerTape, memtuples and availMem. - if ((IS_PGXC_COORDINATOR || state->streamstate != NULL) && slotsPerTape < 1) { - slotsPerTape = MINIMAL_SLOTS_PER_TAPE; - - state->memtupsize = slotsPerTape * activeTapes + state->mergefirstfree; - // free memtuples memoery - FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); - pfree_ext(state->memtuples); - // reapply memory for memtuples - state->memtuples = (SortTuple*)palloc(state->memtupsize * sizeof(SortTuple)); - USEMEM(state, GetMemoryChunkSpace(state->memtuples)); - - /* workMem must be large enough for the minimal memtuples array */ - if (LACKMEM(state)) { - ereport(ERROR, - (errmodule(MOD_EXECUTOR), - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("insufficient memory allowed for sort")))); - } - } - Assert(slotsPerTape > 0); - - spacePerTape = state->availMem / activeTapes; - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { - if (state->mergeactive[srcTape]) { - state->mergeavailslots[srcTape] = slotsPerTape; - state->mergeavailmem[srcTape] = spacePerTape; - } - } + state->activeTapes = activeTapes; #ifdef TRACE_SORT if (u_sess->attr.attr_common.trace_sort) { ereport(LOG, (errmodule(MOD_VEC_EXECUTOR), errmsg("Profiling LOG: " - "Sort(%d) Begin Merge : activeTapes: %d, slotsPerTape: %d, spacePerTape: %ld", - state->planId, - activeTapes, - slotsPerTape, - spacePerTape))); + "Sort(%d) Begin Merge : activeTapes: %d", + state->planId, + activeTapes))); } #endif - /* - * Preread as many tuples as possible (and at least one) from each active - * tape - */ - mergepreread(state); - /* Load the merge heap with the first tuple from each input tape */ for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { - int tupIndex = state->mergenext[srcTape]; - SortTuple* tup = NULL; + SortTuple tup; - if (tupIndex) { - tup = &state->memtuples[tupIndex]; - state->mergenext[srcTape] = tup->tupindex; - if (state->mergenext[srcTape] == 0) - state->mergelast[srcTape] = 0; - tuplesort_heap_insert(state, tup, srcTape); - /* put the now-unused memtuples entry on the freelist */ - tup->tupindex = state->mergefreelist; - state->mergefreelist = tupIndex; - state->mergeavailslots[srcTape]++; + if (mergereadnext(state, srcTape, &tup)) { + tup.tupindex = srcTape; + tuplesort_heap_insert(state, &tup, srcTape); } } } -/* - * mergepreread - load tuples from merge input tapes - * - * This routine exists to improve sequentiality of reads during a merge pass, - * as explained in the header comments of this file. Load tuples from each - * active source tape until the tape's run is exhausted or it has used up - * its fair share of available memory. In any case, we guarantee that there - * is at least one preread tuple available from each unexhausted input tape. - * - * We invoke this routine at the start of a merge pass for initial load, - * and then whenever any tape's preread data runs out. Note that we load - * as much data as possible from all tapes, not just the one that ran out. - * This is because logtape.c works best with a usage pattern that alternates - * between reading a lot of data and writing a lot of data, so whenever we - * are forced to read, we should fill working memory completely. - * - * In FINALMERGE state, we *don't* use this routine, but instead just preread - * from the single tape that ran dry. There's no read/write alternation in - * that state and so no point in scanning through all the tapes to fix one. - * (Moreover, there may be quite a lot of inactive tapes in that state, since - * we might have had many fewer runs than tapes. In a regular tape-to-tape - * merge we can expect most of the tapes to be active.) - */ -static void mergepreread(Tuplesortstate* state) -{ - int srcTape; - - for (srcTape = 0; srcTape < state->maxTapes; srcTape++) { - mergeprereadone(state, srcTape); - } -} - -/* - * mergeprereadone - load tuples from one merge input tape - * - * Read tuples from the specified tape until it has used up its free memory - * or array slots; but ensure that we have at least one tuple, if any are - * to be had. - */ -static void mergeprereadone(Tuplesortstate* state, int srcTape) +static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) { unsigned int tuplen; - SortTuple stup; - int tupIndex; - long priorAvail, spaceUsed; - if (state->mergeactive[srcTape] == false) { - return; /* tape's run is already exhausted */ + if (!state->mergeactive[srcTape]) + return false; /* tape's run is already exhausted */ + + /* read next tuple, if any */ + if ((tuplen = getlen(state, srcTape, true)) == 0) { + state->mergeactive[srcTape] = false; + return false; } - priorAvail = state->availMem; - state->availMem = state->mergeavailmem[srcTape]; - while (state->mergeavailslots[srcTape] > 0 || state->mergenext[srcTape] == 0) { - /* read next tuple, if any */ -#ifdef PGXC - if ((tuplen = GETLEN(state, srcTape, true)) == 0) -#else - if ((tuplen = getlen(state, srcTape, true)) == 0) -#endif - { - state->mergeactive[srcTape] = false; - break; - } - READTUP(state, &stup, srcTape, tuplen); - /* find a free slot in memtuples[] for it */ - tupIndex = state->mergefreelist; - if (tupIndex) { - state->mergefreelist = state->memtuples[tupIndex].tupindex; - } else { - tupIndex = state->mergefirstfree++; - Assert(tupIndex < state->memtupsize); - } - state->mergeavailslots[srcTape]--; - /* store tuple, append to list for its tape */ - stup.tupindex = 0; - state->memtuples[tupIndex] = stup; - if (state->mergelast[srcTape]) { - state->memtuples[state->mergelast[srcTape]].tupindex = tupIndex; - } else { - state->mergenext[srcTape] = tupIndex; - } - state->mergelast[srcTape] = tupIndex; - } - /* update per-tape and global availmem counts */ - spaceUsed = state->mergeavailmem[srcTape] - state->availMem; - state->mergeavailmem[srcTape] = state->availMem; - state->availMem = priorAvail - spaceUsed; + READTUP(state, stup, srcTape, tuplen); + + return true; } /* @@ -3269,6 +3370,38 @@ static void markrunend(Tuplesortstate* state, int tapenum) pgstat_increase_session_spill_size(sizeof(len)); } +/* + * Get memory for tuple from within READTUP() routine. Allocate + * memory and account for that, or consume from tape's batch + * allocation. + * + * Memory returned here in the final on-the-fly merge case is recycled + * from tape's batch allocation. Otherwise, callers must pfree() or + * reset tuple child memory context, and account for that with a + * FREEMEM(). Currently, this only ever needs to happen in WRITETUP() + * routines. + */ +static void* readtup_alloc(Tuplesortstate *state, Size tuplen) +{ + SlabSlot *buf; + + /* + * We pre-allocate enough slots in the slab arena that we should never run + * out. + */ + Assert(state->slabFreeHead); + + if (tuplen > SLAB_SLOT_SIZE || !state->slabFreeHead) + return MemoryContextAlloc(state->sortcontext, tuplen); + else { + buf = state->slabFreeHead; + /* Reuse this slot */ + state->slabFreeHead = buf->nextfree; + + return buf; + } +} + /* * Inline-able copy of FunctionCall2Coll() to save some cycles in sorting. */ @@ -3399,6 +3532,7 @@ static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup) Datum original; MinimalTuple tuple; HeapTupleData htup; + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); /* copy the tuple into sort storage */ tuple = ExecCopySlotMinimalTuple(slot); @@ -3411,6 +3545,9 @@ static void copytup_heap(Tuplesortstate* state, SortTuple* stup, void* tup) htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; htup.t_data = (HeapTupleHeader)((char*)tuple - MINIMAL_TUPLE_OFFSET); original = tableam_tops_tuple_getattr(&htup, state->sortKeys[0].ssup_attno, state->tupDesc, &stup->isnull1); + + MemoryContextSwitchTo(oldcontext); + if (!state->sortKeys->abbrev_converter || stup->isnull1) { /* * Store ordinary Datum representation, or NULL value. If there is a @@ -3472,20 +3609,21 @@ static void writetup_heap(Tuplesortstate* state, int tapenum, SortTuple* stup) pgstat_increase_session_spill_size(tuplen); } - FREEMEM(state, GetMemoryChunkSpace(tuple)); - heap_free_minimal_tuple(tuple); - tuple = NULL; + if (!state->slabAllocatorUsed) { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_free_minimal_tuple(tuple); + tuple = NULL; + } } static void readtup_heap(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len) { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; - MinimalTuple tuple = (MinimalTuple)palloc(tuplen); + MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tuplen); char* tupbody = (char*)tuple + MINIMAL_TUPLE_DATA_OFFSET; HeapTupleData htup; - USEMEM(state, GetMemoryChunkSpace(tuple)); /* read in the tuple proper */ tuple->t_len = tuplen; LogicalTapeReadExact(state->tapeset, tapenum, tupbody, tupbodylen); @@ -3610,9 +3748,11 @@ static int comparetup_cluster(const SortTuple* a, const SortTuple* b, Tuplesorts static void copytup_cluster(Tuplesortstate* state, SortTuple* stup, Tuple tup) { + MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext); Tuple tuple = tableam_tops_copy_tuple(tup); stup->tuple = tuple; USEMEM(state, GetMemoryChunkSpace(tuple)); + MemoryContextSwitchTo(oldcontext); /* set up first-column key value, if it's a simple column */ if (state->indexInfo->ii_KeyAttrNumbers[0] != 0) { stup->datum1 = tableam_tops_tuple_getattr(tuple, @@ -3643,14 +3783,17 @@ static void writetup_cluster(Tuplesortstate* state, int tapenum, SortTuple* stup pgstat_increase_session_spill_size(tuplen); } - FREEMEM(state, GetMemoryChunkSpace(tuple)); - heap_freetuple_ext(tuple); + if (!state->slabAllocatorUsed) { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + heap_freetuple_ext(tuple); + } } static void readtup_cluster(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int tuplen) { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int) - sizeof(TransactionId) * 2; - HeapTuple tuple = (HeapTuple)heaptup_alloc(t_len + HEAPTUPLESIZE); + HeapTuple tuple = (HeapTuple) readtup_alloc(state, t_len + HEAPTUPLESIZE); + tuple->tupTableType = HEAP_TUPLE; USEMEM(state, GetMemoryChunkSpace(tuple)); /* Reconstruct the HeapTupleData header */ @@ -3878,7 +4021,7 @@ static void copytup_index(Tuplesortstate* state, SortTuple* stup, void* tup) IndexTuple newtuple; /* copy the tuple into sort storage */ - newtuple = (IndexTuple)palloc(tuplen); + newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen); errno_t rc = memcpy_s(newtuple, tuplen, tuple, tuplen); securec_check(rc, "\0", "\0"); @@ -3907,16 +4050,17 @@ static void writetup_index(Tuplesortstate* state, int tapenum, SortTuple* stup) pgstat_increase_session_spill_size(tuplen); } - FREEMEM(state, GetMemoryChunkSpace(tuple)); - pfree_ext(tuple); + if (!state->slabAllocatorUsed) { + FREEMEM(state, GetMemoryChunkSpace(tuple)); + pfree_ext(tuple); + } } static void readtup_index(Tuplesortstate* state, SortTuple* stup, int tapenum, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); - IndexTuple tuple = (IndexTuple)palloc(tuplen); + IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); - USEMEM(state, GetMemoryChunkSpace(tuple)); LogicalTapeReadExact(state->tapeset, tapenum, tuple, tuplen); if (state->randomAccess) { /* need trailing length word? */ @@ -3971,7 +4115,7 @@ static void writetup_datum(Tuplesortstate* state, int tapenum, SortTuple* stup) if (stup->isnull1) { waddr = NULL; tuplen = 0; - } else if (state->datumTypeByVal) { + } else if (!state->tuples) { waddr = &stup->datum1; tuplen = sizeof(Datum); } else { @@ -3996,7 +4140,7 @@ static void writetup_datum(Tuplesortstate* state, int tapenum, SortTuple* stup) pgstat_increase_session_spill_size(writtenlen); } - if (stup->tuple != NULL) { + if (!state->slabAllocatorUsed && stup->tuple != NULL) { FREEMEM(state, GetMemoryChunkSpace(stup->tuple)); pfree_ext(stup->tuple); } @@ -4011,19 +4155,18 @@ static void readtup_datum(Tuplesortstate* state, SortTuple* stup, int tapenum, u stup->datum1 = (Datum)0; stup->isnull1 = true; stup->tuple = NULL; - } else if (state->datumTypeByVal) { + } else if (!state->tuples) { Assert(tuplen == sizeof(Datum)); LogicalTapeReadExact(state->tapeset, tapenum, &stup->datum1, tuplen); stup->isnull1 = false; stup->tuple = NULL; } else { - void* raddr = palloc(tuplen); + void* raddr = readtup_alloc(state, tuplen); LogicalTapeReadExact(state->tapeset, tapenum, raddr, tuplen); stup->datum1 = PointerGetDatum(raddr); stup->isnull1 = false; stup->tuple = raddr; - USEMEM(state, GetMemoryChunkSpace(raddr)); } if (state->randomAccess) { @@ -4684,11 +4827,6 @@ Tuplesortstate* tuplesort_begin_merge(TupleDesc tupDesc, int nkeys, AttrNumber* state->tapeRange = conn_count; state->mergeactive = (bool*)palloc0(conn_count * sizeof(bool)); - state->mergenext = (int*)palloc0(conn_count * sizeof(int)); - state->mergelast = (int*)palloc0(conn_count * sizeof(int)); - state->mergeavailslots = (int*)palloc0(conn_count * sizeof(int)); - state->mergeavailmem = (long*)palloc0(conn_count * sizeof(long)); - state->tp_runs = (int*)palloc0(conn_count * sizeof(int)); state->tp_dummy = (int*)palloc0(conn_count * sizeof(int)); state->tp_tapenum = (int*)palloc0(conn_count * sizeof(int)); diff --git a/src/gausskernel/optimizer/commands/cluster.cpp b/src/gausskernel/optimizer/commands/cluster.cpp index 9dc914569..1aef4df27 100755 --- a/src/gausskernel/optimizer/commands/cluster.cpp +++ b/src/gausskernel/optimizer/commands/cluster.cpp @@ -170,7 +170,7 @@ static void VacFullCompaction(Relation oldHeap, Oid partOid); } #endif void swapRelationIndicesRelfileNode(Relation rel1, Relation rel2, uint8 needSwitch); -static void GttSwapRelationFiles(Oid r1, Oid r2, TransactionId frozenXid); +static void GttSwapRelationFiles(Oid r1, Oid r2, TransactionId frozenXid); static void HbktModifyPartIndexRelnode(Relation indexRel, Partition indexPart, DataTransferType transferType, Oid bucketOid); @@ -1897,11 +1897,9 @@ double CopyUHeapDataInternal(Relation oldHeap, Relation oldIndex, Relation newHe for (;;) { UHeapTuple utuple; - bool shouldfree = false; - CHECK_FOR_INTERRUPTS(); - utuple = (UHeapTuple)tuplesort_getheaptuple(tuplesort, true, &shouldfree); + utuple = (UHeapTuple)tuplesort_getheaptuple(tuplesort, true); if (utuple == NULL) break; @@ -1909,9 +1907,6 @@ double CopyUHeapDataInternal(Relation oldHeap, Relation oldIndex, Relation newHe ReformAndRewriteUTuple(utuple, oldTupDesc, newTupDesc, values, isnull, newHeap->rd_rel->relhasoids, rwstate); - - if (shouldfree) - UHeapFreeTuple(utuple); } tuplesort_end(tuplesort); @@ -2226,19 +2221,14 @@ double copy_heap_data_internal(Relation OldHeap, Relation OldIndex, Relation New for (;;) { HeapTuple tuple; - bool shouldfree = false; - CHECK_FOR_INTERRUPTS(); - tuple = (HeapTuple)tuplesort_getheaptuple(tuplesort, true, &shouldfree); + tuple = (HeapTuple)tuplesort_getheaptuple(tuplesort, true); if (tuple == NULL) break; reform_and_rewrite_tuple( tuple, oldTupDesc, newTupDesc, values, isnull, NewHeap->rd_rel->relhasoids, rwstate); - - if (shouldfree) - heap_freetuple(tuple); } tuplesort_end(tuplesort); @@ -3369,7 +3359,7 @@ void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool is_system_catalog, bo */ if (get_rel_persistence(OIDOldHeap) == RELPERSISTENCE_GLOBAL_TEMP) { Assert(!is_system_catalog); - GttSwapRelationFiles(OIDOldHeap, OIDNewHeap, frozenXid); + GttSwapRelationFiles(OIDOldHeap, OIDNewHeap, frozenXid); } swap_relation_files(OIDOldHeap, OIDNewHeap, (OIDOldHeap == RelationRelationId), @@ -3575,7 +3565,7 @@ static List* get_tables_to_cluster(MemoryContext cluster_context) return rvs; } -static void GttSwapRelationFiles(Oid r1, Oid r2, TransactionId frozenXid) +static void GttSwapRelationFiles(Oid r1, Oid r2, TransactionId frozenXid) { Oid relfilenode1; Oid relfilenode2; @@ -3589,7 +3579,7 @@ static void GttSwapRelationFiles(Oid r1, Oid r2, TransactionId frozenXid) relfilenode2 = gtt_fetch_current_relfilenode(r2); Assert(OidIsValid(relfilenode1) && OidIsValid(relfilenode2)); - gtt_switch_rel_relfilenode(r1, relfilenode1, r2, relfilenode2, true, frozenXid); + gtt_switch_rel_relfilenode(r1, relfilenode1, r2, relfilenode2, true, frozenXid); CacheInvalidateRelcache(rel1); CacheInvalidateRelcache(rel2); diff --git a/src/gausskernel/storage/access/hash/hashsort.cpp b/src/gausskernel/storage/access/hash/hashsort.cpp index f5a9aab4e..ca608d1c5 100644 --- a/src/gausskernel/storage/access/hash/hashsort.cpp +++ b/src/gausskernel/storage/access/hash/hashsort.cpp @@ -113,13 +113,10 @@ void _h_spool(HSpool *hspool, ItemPointer self, Datum *values, const bool *isnul void _h_indexbuild(HSpool *hspool, Relation heapRel) { IndexTuple itup; - bool should_free = false; tuplesort_performsort(hspool->sortstate); - while ((itup = tuplesort_getindextuple(hspool->sortstate, true, &should_free)) != NULL) { + while ((itup = tuplesort_getindextuple(hspool->sortstate, true)) != NULL) { _hash_doinsert(hspool->index, itup, heapRel); - if (should_free) - pfree(itup); } } diff --git a/src/gausskernel/storage/access/nbtree/nbtsort.cpp b/src/gausskernel/storage/access/nbtree/nbtsort.cpp index 729fa7085..45634b511 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsort.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsort.cpp @@ -833,8 +833,6 @@ static void _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) bool merge = (btspool2 != NULL); IndexTuple itup = NULL; IndexTuple itup2 = NULL; - bool should_free = false; - bool should_free2 = false; bool load1 = false; TupleDesc tupdes = RelationGetDescr(wstate->index); int keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index); @@ -847,8 +845,8 @@ static void _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) * * the preparation of merge */ - itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free); - itup2 = tuplesort_getindextuple(btspool2->sortstate, true, &should_free2); + itup = tuplesort_getindextuple(btspool->sortstate, true); + itup2 = tuplesort_getindextuple(btspool2->sortstate, true); indexScanKey = _bt_mkscankey_nodata(wstate->index); for (;;) { @@ -863,33 +861,21 @@ static void _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2) if (load1) { _bt_buildadd(wstate, state, itup); - if (should_free) { - pfree(itup); - itup = NULL; - } - itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free); + itup = tuplesort_getindextuple(btspool->sortstate, true); } else { _bt_buildadd(wstate, state, itup2); - if (should_free2) { - pfree(itup2); - itup2 = NULL; - } - itup2 = tuplesort_getindextuple(btspool2->sortstate, true, &should_free2); + itup2 = tuplesort_getindextuple(btspool2->sortstate, true); } } _bt_freeskey(indexScanKey); } else { /* merge is unnecessary */ - while ((itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free)) != NULL) { + while ((itup = tuplesort_getindextuple(btspool->sortstate, true)) != NULL) { /* When we see first tuple, create first index page */ if (state == NULL) state = _bt_pagestate(wstate, 0); _bt_buildadd(wstate, state, itup); - if (should_free) { - pfree(itup); - itup = NULL; - } } } diff --git a/src/gausskernel/storage/access/ubtree/ubtsort.cpp b/src/gausskernel/storage/access/ubtree/ubtsort.cpp index 928aad4df..c36cd0edb 100644 --- a/src/gausskernel/storage/access/ubtree/ubtsort.cpp +++ b/src/gausskernel/storage/access/ubtree/ubtsort.cpp @@ -750,8 +750,6 @@ static void UBTreeLoad(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2 bool merge = (btspool2 != NULL); IndexTuple itup = NULL; IndexTuple itup2 = NULL; - bool should_free = false; - bool should_free2 = false; bool load1 = false; TupleDesc tupdes = RelationGetDescr(wstate->index); int keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index); @@ -763,8 +761,8 @@ static void UBTreeLoad(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2 * * the preparation of merge */ - itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free); - itup2 = tuplesort_getindextuple(btspool2->sortstate, true, &should_free2); + itup = tuplesort_getindextuple(btspool->sortstate, true); + itup2 = tuplesort_getindextuple(btspool2->sortstate, true); for (;;) { if (itup == NULL && itup2 == NULL) { @@ -780,32 +778,20 @@ static void UBTreeLoad(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2 if (load1) { UBTreeBuildAdd(wstate, state, itup, false); - if (should_free) { - pfree(itup); - itup = NULL; - } - itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free); + itup = tuplesort_getindextuple(btspool->sortstate, true); } else { UBTreeBuildAdd(wstate, state, itup2, false); - if (should_free2) { - pfree(itup2); - itup2 = NULL; - } - itup2 = tuplesort_getindextuple(btspool2->sortstate, true, &should_free2); + itup2 = tuplesort_getindextuple(btspool2->sortstate, true); } } } else { /* merge is unnecessary */ - while ((itup = tuplesort_getindextuple(btspool->sortstate, true, &should_free)) != NULL) { + while ((itup = tuplesort_getindextuple(btspool->sortstate, true)) != NULL) { /* When we see first tuple, create first index page */ if (state == NULL) state = UBTreePageState(wstate, 0); UBTreeBuildAdd(wstate, state, itup, false); - if (should_free) { - pfree(itup); - itup = NULL; - } } } diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h index 642e8e802..5b5174450 100644 --- a/src/include/utils/logtape.h +++ b/src/include/utils/logtape.h @@ -68,5 +68,6 @@ extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, long blocknum, int offset); extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, long *blocknum, int *offset); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); +extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t bufsize); #endif /* LOGTAPE_H */ \ No newline at end of file diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 7bf3a711b..de537ed51 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -152,8 +152,8 @@ extern void tuplesort_performsort(Tuplesortstate* state); extern bool tuplesort_gettupleslot(Tuplesortstate* state, bool forward, TupleTableSlot* slot, Datum* abbrev); extern bool tuplesort_gettupleslot_into_tuplestore( Tuplesortstate* state, bool forward, TupleTableSlot* slot, Datum* abbrev, Tuplestorestate* tstate); -extern void* tuplesort_getheaptuple(Tuplesortstate* state, bool forward, bool* should_free); -extern IndexTuple tuplesort_getindextuple(Tuplesortstate* state, bool forward, bool* should_free); +extern void* tuplesort_getheaptuple(Tuplesortstate* state, bool forward); +extern IndexTuple tuplesort_getindextuple(Tuplesortstate* state, bool forward); extern bool tuplesort_getdatum(Tuplesortstate* state, bool forward, Datum* val, bool* isNull); extern void tuplesort_end(Tuplesortstate* state); diff --git a/src/test/regress/pg_regress.cpp b/src/test/regress/pg_regress.cpp index a2559b1aa..6cfc239cc 100644 --- a/src/test/regress/pg_regress.cpp +++ b/src/test/regress/pg_regress.cpp @@ -5461,7 +5461,7 @@ static void check_global_variables() } } -#define BASE_PGXC_LIKE_MACRO_NUM 1391 +#define BASE_PGXC_LIKE_MACRO_NUM 1390 static void check_pgxc_like_macros() { #ifdef BUILD_BY_CMAKE