diff --git a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp index 3991af570..9e82a7d1a 100644 --- a/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp +++ b/src/gausskernel/storage/replication/logical/parallel_reorderbuffer.cpp @@ -80,12 +80,14 @@ static void ParallelReorderBufferRestoreChange(ParallelReorderBuffer *rb, Parall /* Parallel decoding batch sending unit length is set to 1MB. */ static const int g_batch_unit_length = 1 * 1024 * 1024; +/* Parallel decoding caches at most 512 transactions. */ +static const Size g_max_cached_txns = 512; + void ParallelReorderBufferQueueChange(ParallelReorderBuffer *rb, logicalLog *change, int slotId) { ParallelReorderBufferTXN *txn = NULL; txn = ParallelReorderBufferTXNByXid(rb, change->xid, true, NULL, change->lsn, true); - Assert(InvalidXLogRecPtr != change->lsn); dlist_push_tail(&txn->changes, &change->node); txn->nentries++; txn->nentries_mem++; @@ -238,7 +240,13 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetTXN(ParallelReorderBuffer *rb) ParallelReorderBufferTXN *txn = NULL; int rc = 0; /* check the slab cache */ - txn = (ParallelReorderBufferTXN *)palloc(sizeof(ParallelReorderBufferTXN)); + if (rb->nr_cached_transactions > 0) { + rb->nr_cached_transactions--; + txn = (ParallelReorderBufferTXN *)dlist_container(ParallelReorderBufferTXN, node, + dlist_pop_head_node(&rb->cached_transactions)); + } else { + txn = (ParallelReorderBufferTXN *)palloc(sizeof(ParallelReorderBufferTXN)); + } rc = memset_s(txn, sizeof(ParallelReorderBufferTXN), 0, sizeof(ParallelReorderBufferTXN)); securec_check(rc, "", ""); @@ -249,6 +257,7 @@ ParallelReorderBufferTXN *ParallelReorderBufferGetTXN(ParallelReorderBuffer *rb) return txn; } + void ParallelReorderBufferReturnTXN(ParallelReorderBuffer *rb, ParallelReorderBufferTXN *txn) { /* clean the lookup cache if we were cached (quite likely) */ @@ -270,6 +279,13 @@ void ParallelReorderBufferReturnTXN(ParallelReorderBuffer *rb, ParallelReorderBu txn->nentries = 0; txn->nentries_mem = 0; + /* check whether to put into the slab cache */ + if (rb->nr_cached_transactions < g_max_cached_txns) { + rb->nr_cached_transactions++; + dlist_push_head(&rb->cached_transactions, &txn->node); + } else { + pfree(txn); + } } /* --------------------------------------- * toast reassembly support @@ -1163,6 +1179,9 @@ static logicalLog *ParallelReorderBufferIterTXNNext(ParallelReorderBuffer *rb, entry = &state->entries[off]; if (!dlist_is_empty(&state->old_change)) { + change = dlist_container(logicalLog, node, dlist_pop_head_node(&state->old_change)); + FreeLogicalLog(change, slotId); + /* We should find atmost one old_change here */ Assert(dlist_is_empty(&state->old_change)); } @@ -1219,6 +1238,29 @@ static logicalLog *ParallelReorderBufferIterTXNNext(ParallelReorderBuffer *rb, return change; } +/* + * Deallocate the iterator + */ +static void ParallelReorderBufferIterTXNFinish(ParallelReorderBufferIterTXNState *state, int slotId) +{ + for (Size off = 0; off < state->nr_txns; off++) { + if (state->entries[off].fd != -1) { + (void)CloseTransientFile(state->entries[off].fd); + } + } + + /* free memory we might have "leaked" in the last *Next call */ + if (!dlist_is_empty(&state->old_change)) { + logicalLog *change = NULL; + change = dlist_container(logicalLog, node, dlist_pop_head_node(&state->old_change)); + FreeLogicalLog(change, slotId); + Assert(dlist_is_empty(&state->old_change)); + } + + binaryheap_free(state->heap); + pfree(state); +} + /* * Forget the contents of a transaction if we aren't interested in it's * contents. Needs to be first called for subtransactions and then for the @@ -1237,6 +1279,7 @@ void ParallelReorderBufferForget(ParallelReorderBuffer *rb, int slotId, Parallel FreeLogicalLog(logChange, slotId); } ParallelReorderBufferCleanupTXN(rb, txn); + ParallelReorderBufferIterTXNFinish(iterstate, slotId); } /* @@ -1440,6 +1483,7 @@ void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, g_Logicaldispatcher[slotId].pOptions.sending_batch > 0); } ParallelReorderBufferCleanupTXN(rb, txn); + ParallelReorderBufferIterTXNFinish(iterstate, slotId); ParallelCheckBatch(ctx->out, pdata, slotId, &oldCtx, (pdata->pOptions.skip_empty_xacts && !pdata->pOptions.xact_wrote_changes)); @@ -1545,12 +1589,14 @@ ParallelReorderBuffer *ParallelReorderBufferAllocate(int slotId) hash_ctl.hcxt = buffer->context; const long reorderBufferNelem = 1000; - buffer->by_txn = hash_create("ReorderBufferByXid", reorderBufferNelem, + buffer->by_txn = hash_create("ParallelReorderBufferByXid", reorderBufferNelem, &hash_ctl, HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; + buffer->nr_cached_transactions = 0; + buffer->nr_cached_tuplebufs = 0; buffer->nr_cached_changes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; diff --git a/src/include/replication/parallel_reorderbuffer.h b/src/include/replication/parallel_reorderbuffer.h index 3a69b3b20..866b53cf5 100644 --- a/src/include/replication/parallel_reorderbuffer.h +++ b/src/include/replication/parallel_reorderbuffer.h @@ -340,12 +340,15 @@ struct ParallelReorderBuffer { * ontop of reorderbuffer.c */ + /* cached ParallelReorderBufferTXNs */ + dlist_head cached_transactions; + Size nr_cached_transactions; - /* cached ReorderBufferChanges */ + /* cached ParallelReorderBufferChanges */ dlist_head cached_changes; Size nr_cached_changes; - /* cached ReorderBufferTupleBufs */ + /* cached ParallelReorderBufferTupleBufs */ slist_head cached_tuplebufs; Size nr_cached_tuplebufs; TransactionId lastRunningXactOldestXmin;