diff --git a/src/gausskernel/runtime/executor/execParallel.cpp b/src/gausskernel/runtime/executor/execParallel.cpp index 7f4644d0c..3a6b3f12e 100644 --- a/src/gausskernel/runtime/executor/execParallel.cpp +++ b/src/gausskernel/runtime/executor/execParallel.cpp @@ -145,14 +145,14 @@ static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateConte /* Call estimators for parallel-aware nodes. */ if (planstate->plan->parallel_aware) { - switch (nodeTag(planstate)) { - case T_SeqScanState: - ExecSeqScanEstimate((SeqScanState *)planstate, e->pcxt); - break; - default: - break; - } - } + switch (nodeTag(planstate)) { + case T_SeqScanState: + ExecSeqScanEstimate((SeqScanState *)planstate, e->pcxt); + break; + default: + break; + } + } return planstate_tree_walker(planstate, (bool (*)())ExecParallelEstimate, e); } @@ -179,16 +179,16 @@ static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitiali knl_u_parallel_context *cxt = (knl_u_parallel_context *)d->pcxt->seg; /* Call initializers for parallel-aware plan nodes. */ - if (planstate->plan->parallel_aware) { - switch (nodeTag(planstate)) { - case T_SeqScanState: - ExecSeqScanInitializeDSM((SeqScanState *)planstate, d->pcxt, cxt->pwCtx->pscan_num); - cxt->pwCtx->pscan_num++; - break; - default: - break; - } - } + if (planstate->plan->parallel_aware) { + switch (nodeTag(planstate)) { + case T_SeqScanState: + ExecSeqScanInitializeDSM((SeqScanState *)planstate, d->pcxt, cxt->pwCtx->queryInfo.pscan_num); + cxt->pwCtx->queryInfo.pscan_num++; + break; + default: + break; + } + } return planstate_tree_walker(planstate, (bool (*)())ExecParallelInitializeDSM, d); } @@ -211,10 +211,10 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool * otherwise, find the already allocated space. */ if (!reinitialize) { - cxt->pwCtx->tupleQueue = (char *)palloc0(PARALLEL_TUPLE_QUEUE_SIZE * (Size)pcxt->nworkers); + cxt->pwCtx->queryInfo.tupleQueue = (char *)palloc0(PARALLEL_TUPLE_QUEUE_SIZE * (Size)pcxt->nworkers); } - Assert(cxt->pwCtx->tupleQueue != NULL); - char *tqueuespace = cxt->pwCtx->tupleQueue; + Assert(cxt->pwCtx->queryInfo.tupleQueue != NULL); + char *tqueuespace = cxt->pwCtx->queryInfo.tupleQueue; /* Create the queues, and become the receiver for each. */ for (int i = 0; i < pcxt->nworkers; ++i) { @@ -227,6 +227,27 @@ static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool return responseq; } +/* + * Set up tuple queue readers to read the results of a parallel subplan. + * All the workers are expected to return tuples matching tupDesc. + * + * This is separate from ExecInitParallelPlan() because we can launch the + * worker processes and let them start doing something before we do this. + */ +void ExecParallelCreateReaders(ParallelExecutorInfo *pei, TupleDesc tupDesc) +{ + Assert(pei->reader == NULL); + int nworkers = pei->pcxt->nworkers_launched; + if (nworkers > 0) { + pei->reader = (TupleQueueReader **)palloc((uint32)nworkers * sizeof(TupleQueueReader *)); + for (int i = 0; i < nworkers; i++) { + shm_mq_set_handle(pei->tqueue[i], pei->pcxt->worker[i].bgwhandle); + pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i], tupDesc); + } + } +} + + /* * Re-initialize the parallel executor info such that it can be reused by * workers. @@ -235,6 +256,7 @@ void ExecParallelReinitialize(ParallelExecutorInfo *pei) { ReinitializeParallelDSM(pei->pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->reader = NULL; pei->finished = false; } @@ -289,21 +311,23 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, * asked for has been allocated or initialized yet, though, so do that. */ MemoryContext oldcontext = MemoryContextSwitchTo(cxt->memCtx); + ParallelQueryInfo queryInfo; + int rc = memset_s(&queryInfo, sizeof(ParallelQueryInfo), 0, sizeof(ParallelQueryInfo)); + securec_check(rc, "", ""); /* Store serialized PlannedStmt. */ - cxt->pwCtx->pstmt_space = ExecSerializePlan(planstate->plan, estate); + queryInfo.pstmt_space = ExecSerializePlan(planstate->plan, estate); /* Store serialized ParamListInfo. */ - cxt->pwCtx->param_space = (char *)palloc0(param_len); - cxt->pwCtx->param_len = param_len; - SerializeParamList(estate->es_param_list_info, cxt->pwCtx->param_space, param_len); + queryInfo.param_space = (char *)palloc0(param_len); + queryInfo.param_len = param_len; + SerializeParamList(estate->es_param_list_info, queryInfo.param_space, param_len); /* Allocate space for each worker's BufferUsage; no need to initialize. */ - cxt->pwCtx->bufUsage = (BufferUsage *)palloc0(sizeof(BufferUsage) * pcxt->nworkers); - pei->buffer_usage = cxt->pwCtx->bufUsage; - - /* Set up tuple queues. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + queryInfo.bufUsage = (BufferUsage *)palloc0(sizeof(BufferUsage) * pcxt->nworkers); + pei->buffer_usage = queryInfo.bufUsage; + /* We don't need the TupleQueueReaders yet, though. */ + pei->reader = NULL; /* * If instrumentation options were supplied, allocate space for the @@ -311,19 +335,19 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, * during ExecParallelInitializeDSM. */ if (estate->es_instrument) { - cxt->pwCtx->instrumentation = (SharedExecutorInstrumentation *)palloc0(instrumentation_len); - cxt->pwCtx->instrumentation->instrument_options = estate->es_instrument; - cxt->pwCtx->instrumentation->instrument_offset = instrument_offset; - cxt->pwCtx->instrumentation->num_workers = nworkers; - cxt->pwCtx->instrumentation->num_plan_nodes = e.nnodes; - Instrumentation *instrument = GetInstrumentationArray(cxt->pwCtx->instrumentation); + queryInfo.instrumentation = (SharedExecutorInstrumentation *)palloc0(instrumentation_len); + queryInfo.instrumentation->instrument_options = estate->es_instrument; + queryInfo.instrumentation->instrument_offset = instrument_offset; + queryInfo.instrumentation->num_workers = nworkers; + queryInfo.instrumentation->num_plan_nodes = e.nnodes; + Instrumentation *instrument = GetInstrumentationArray(queryInfo.instrumentation); for (int i = 0; i < nworkers * e.nnodes; ++i) { InstrInit(&instrument[i], estate->es_instrument); } - pei->instrumentation = cxt->pwCtx->instrumentation; + pei->instrumentation = queryInfo.instrumentation; } - cxt->pwCtx->pscan = (ParallelHeapScanDesc *)palloc0(sizeof(ParallelHeapScanDesc) * e.nnodes); + queryInfo.pscan = (ParallelHeapScanDesc *)palloc0(sizeof(ParallelHeapScanDesc) * e.nnodes); /* * Give parallel-aware nodes a chance to initialize their shared data. @@ -331,9 +355,14 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, * if it exists. */ d.pcxt = pcxt; - d.instrumentation = cxt->pwCtx->instrumentation; + d.instrumentation = queryInfo.instrumentation; d.nnodes = 0; + cxt->pwCtx->queryInfo = queryInfo; + + /* Set up the tuple queues that the workers will write into. */ + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + /* Here we switch to old context, cause heap_beginscan_parallel need malloc memory */ (void)MemoryContextSwitchTo(oldcontext); (void)ExecParallelInitializeDSM(planstate, &d); @@ -397,12 +426,43 @@ void ExecParallelFinish(ParallelExecutorInfo *pei) if (pei->finished) return; - /* First, wait for the workers to finish. */ + int nworkers = pei->pcxt->nworkers_launched; + int i; + + /* + * Detach from tuple queues ASAP, so that any still-active workers will + * notice that no further results are wanted. + */ + if (pei->tqueue != NULL) { + for (i = 0; i < nworkers; i++) { + shm_mq_detach(pei->tqueue[i]); + } + pfree(pei->tqueue); + pei->tqueue = NULL; + } + + /* + * While we're waiting for the workers to finish, let's get rid of the + * tuple queue readers. (Any other local cleanup could be done here too.) + */ + if (pei->reader != NULL) { + for (i = 0; i < nworkers; i++) { + DestroyTupleQueueReader(pei->reader[i]); + } + pfree(pei->reader); + pei->reader = NULL; + } + + /* Now wait for the workers to finish. */ WaitForParallelWorkersToFinish(pei->pcxt); - /* Next, accumulate buffer usage. */ - for (int i = 0; i < pei->pcxt->nworkers; ++i) + /* + * Next, accumulate buffer usage. (This must wait for the workers to + * finish, or we might get incomplete data.) + */ + for (i = 0; i < nworkers; ++i) { InstrAccumParallelQuery(&pei->buffer_usage[i]); + } /* Finally, accumulate instrumentation, if any. */ if (pei->instrumentation) { @@ -436,7 +496,7 @@ static DestReceiver *ExecParallelGetReceiver(void *seg) Assert(seg != NULL); knl_u_parallel_context *cxt = (knl_u_parallel_context *)seg; - char *mqspace = cxt->pwCtx->tupleQueue; + char *mqspace = cxt->pwCtx->queryInfo.tupleQueue; mqspace += t_thrd.bgworker_cxt.ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE; shm_mq *mq = (shm_mq *)mqspace; shm_mq_set_sender(mq, t_thrd.proc); @@ -451,10 +511,10 @@ static QueryDesc *ExecParallelGetQueryDesc(void *seg, DestReceiver *receiver, in knl_u_parallel_context *cxt = (knl_u_parallel_context *)seg; /* Reconstruct leader-supplied PlannedStmt. */ - PlannedStmt *pstmt = (PlannedStmt *)stringToNode(cxt->pwCtx->pstmt_space); + PlannedStmt *pstmt = (PlannedStmt *)stringToNode(cxt->pwCtx->queryInfo.pstmt_space); /* Reconstruct ParamListInfo. */ - ParamListInfo paramLI = RestoreParamList(cxt->pwCtx->param_space, cxt->pwCtx->param_len); + ParamListInfo paramLI = RestoreParamList(cxt->pwCtx->queryInfo.param_space, cxt->pwCtx->queryInfo.param_len); /* * Create a QueryDesc for the query. @@ -553,7 +613,7 @@ void ParallelQueryMain(void *seg) /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ knl_u_parallel_context *cxt = (knl_u_parallel_context *)seg; DestReceiver *receiver = ExecParallelGetReceiver(seg); - SharedExecutorInstrumentation *instrumentation = cxt->pwCtx->instrumentation; + SharedExecutorInstrumentation *instrumentation = cxt->pwCtx->queryInfo.instrumentation; if (instrumentation != NULL) instrument_options = instrumentation->instrument_options; QueryDesc *queryDesc = ExecParallelGetQueryDesc(seg, receiver, instrument_options); @@ -568,7 +628,7 @@ void ParallelQueryMain(void *seg) ExecutorFinish(queryDesc); /* Report buffer usage during parallel execution. */ - BufferUsage *buffer_usage = cxt->pwCtx->bufUsage; + BufferUsage *buffer_usage = cxt->pwCtx->queryInfo.bufUsage; InstrEndParallelQuery(&buffer_usage[t_thrd.bgworker_cxt.ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ diff --git a/src/gausskernel/runtime/executor/nodeGather.cpp b/src/gausskernel/runtime/executor/nodeGather.cpp index e11625777..7d5595ec2 100644 --- a/src/gausskernel/runtime/executor/nodeGather.cpp +++ b/src/gausskernel/runtime/executor/nodeGather.cpp @@ -121,7 +121,6 @@ GatherState *ExecInitGather(Gather *node, EState *estate, int eflags) TupleTableSlot *ExecGather(GatherState *node) { TupleTableSlot *fslot = node->funnel_slot; - int i; TupleTableSlot *slot = NULL; TupleTableSlot *resultSlot = NULL; ExprDoneCond isDone; @@ -143,8 +142,6 @@ TupleTableSlot *ExecGather(GatherState *node) * parallel mode is active then we can try to fire up some workers. */ if (gather->num_workers > 0 && IsInParallelMode()) { - bool got_any_worker = false; - /* Initialize the workers required to execute Gather node. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, estate, gather->num_workers); @@ -157,31 +154,28 @@ TupleTableSlot *ExecGather(GatherState *node) LaunchParallelWorkers(pcxt); /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers > 0) { - node->nreaders = 0; - node->reader = (TupleQueueReader **)palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + if (pcxt->nworkers_launched > 0) { + ExecParallelCreateReaders(node->pei, fslot->tts_tupleDescriptor); - for (i = 0; i < pcxt->nworkers; ++i) { - if (pcxt->worker[i].bgwhandle == NULL) - continue; + /* Make a working array showing the active readers */ + node->nreaders = pcxt->nworkers_launched; + Size readerSize = node->nreaders * sizeof(TupleQueueReader *); + node->reader = (TupleQueueReader **)palloc(readerSize); - shm_mq_set_handle(node->pei->tqueue[i], pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = - CreateTupleQueueReader(node->pei->tqueue[i], fslot->tts_tupleDescriptor); - got_any_worker = true; - } - } + int rc = memcpy_s(node->reader, readerSize, node->pei->reader, readerSize); + securec_check(rc, "", ""); - /* No workers? Then never mind. */ - if (!got_any_worker) { - ExecShutdownGatherWorkers(node); - } else { t_thrd.subrole = BACKGROUND_LEADER; + } else { + /* No workers? Then never mind. */ + node->nreaders = 0; + node->reader = NULL; } + node->nextreader = 0; } /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->reader == NULL) || + node->need_to_scan_locally = (node->nreaders == 0) || (!gather->single_copy && u_sess->attr.attr_sql.parallel_leader_participation); node->initialized = true; } @@ -261,10 +255,10 @@ static TupleTableSlot *gather_getnext(GatherState *gatherstate) PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *fslot = gatherstate->funnel_slot; - while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { + while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) { CHECK_FOR_INTERRUPTS(); - if (gatherstate->reader != NULL) { + if (gatherstate->nreaders > 0) { HeapTuple tup = gather_readnext(gatherstate); if (HeapTupleIsValid(tup)) { (void)ExecStoreTuple(tup, /* tuple to store */ @@ -306,15 +300,13 @@ static HeapTuple gather_readnext(GatherState *gatherstate) HeapTuple tup = TupleQueueReaderNext(reader, true, &readerdone); /* - * If this reader is done, remove it. If all readers are done, - * clean up remaining worker state. + * If this reader is done, remove it from our working array of active + * readers. If all readers are done, we're outta here. */ if (readerdone) { Assert(!tup); - DestroyTupleQueueReader(reader); --gatherstate->nreaders; if (gatherstate->nreaders == 0) { - ExecShutdownGatherWorkers(gatherstate); return NULL; } Size remainSize = sizeof(TupleQueueReader *) * (gatherstate->nreaders - gatherstate->nextreader); @@ -366,9 +358,7 @@ static HeapTuple gather_readnext(GatherState *gatherstate) /* ---------------------------------------------------------------- * ExecShutdownGatherWorkers * - * Destroy the parallel workers. Collect all the stats after - * workers are stopped, else some work done by workers won't be - * accounted. + * Stop all the parallel workers. * ---------------------------------------------------------------- */ static void ExecShutdownGatherWorkers(GatherState *node) @@ -377,14 +367,8 @@ static void ExecShutdownGatherWorkers(GatherState *node) if (node->pei != NULL) ExecParallelFinish(node->pei); - /* Shut down tuple queue readers before shutting down workers. */ - if (node->reader != NULL) { - for (int i = 0; i < node->nreaders; ++i) - DestroyTupleQueueReader(node->reader[i]); - - pfree(node->reader); - node->reader = NULL; - } + /* Flush local copy of reader array */ + pfree_ext(node->reader); } /* ---------------------------------------------------------------- diff --git a/src/gausskernel/runtime/executor/nodeSeqscan.cpp b/src/gausskernel/runtime/executor/nodeSeqscan.cpp index 51ad0a980..8557f8a23 100755 --- a/src/gausskernel/runtime/executor/nodeSeqscan.cpp +++ b/src/gausskernel/runtime/executor/nodeSeqscan.cpp @@ -672,12 +672,12 @@ void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt, int nod knl_u_parallel_context *cxt = (knl_u_parallel_context *)pcxt->seg; /* Here we can't use palloc, cause we have switch to old memctx in ExecInitParallelPlan */ - cxt->pwCtx->pscan[nodeid] = (ParallelHeapScanDesc)MemoryContextAllocZero(cxt->memCtx, node->pscan_len); - heap_parallelscan_initialize(cxt->pwCtx->pscan[nodeid], node->pscan_len, node->ss_currentRelation, + cxt->pwCtx->queryInfo.pscan[nodeid] = (ParallelHeapScanDesc)MemoryContextAllocZero(cxt->memCtx, node->pscan_len); + heap_parallelscan_initialize(cxt->pwCtx->queryInfo.pscan[nodeid], node->pscan_len, node->ss_currentRelation, estate->es_snapshot); - cxt->pwCtx->pscan[nodeid]->plan_node_id = node->ps.plan->plan_node_id; + cxt->pwCtx->queryInfo.pscan[nodeid]->plan_node_id = node->ps.plan->plan_node_id; node->ss_currentScanDesc = - (AbsTblScanDesc)heap_beginscan_parallel(node->ss_currentRelation, cxt->pwCtx->pscan[nodeid]); + (AbsTblScanDesc)heap_beginscan_parallel(node->ss_currentRelation, cxt->pwCtx->queryInfo.pscan[nodeid]); } /* ---------------------------------------------------------------- @@ -691,9 +691,9 @@ void ExecSeqScanInitializeWorker(SeqScanState *node, void *context) ParallelHeapScanDesc pscan = NULL; knl_u_parallel_context *cxt = (knl_u_parallel_context *)context; - for (int i = 0; i < cxt->pwCtx->pscan_num; i++) { - if (node->ps.plan->plan_node_id == cxt->pwCtx->pscan[i]->plan_node_id) { - pscan = cxt->pwCtx->pscan[i]; + for (int i = 0; i < cxt->pwCtx->queryInfo.pscan_num; i++) { + if (node->ps.plan->plan_node_id == cxt->pwCtx->queryInfo.pscan[i]->plan_node_id) { + pscan = cxt->pwCtx->queryInfo.pscan[i]; break; } } diff --git a/src/gausskernel/runtime/executor/tqueue.cpp b/src/gausskernel/runtime/executor/tqueue.cpp index 896d2cb44..26daae8bf 100644 --- a/src/gausskernel/runtime/executor/tqueue.cpp +++ b/src/gausskernel/runtime/executor/tqueue.cpp @@ -452,12 +452,9 @@ TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupled */ void DestroyTupleQueueReader(TupleQueueReader *reader) { - if (reader->queue != NULL) { - shm_mq_detach(reader->queue); - reader->queue = NULL; - } - if (reader->remapinfo != NULL) + if (reader->remapinfo != NULL) { pfree(reader->remapinfo); + } pfree(reader); } diff --git a/src/gausskernel/storage/access/transam/parallel.cpp b/src/gausskernel/storage/access/transam/parallel.cpp index 0e1c11c34..460919556 100644 --- a/src/gausskernel/storage/access/transam/parallel.cpp +++ b/src/gausskernel/storage/access/transam/parallel.cpp @@ -132,8 +132,22 @@ void InitializeParallelDSM(ParallelContext *pcxt, const void *snap) */ pcxt->seg = dsm_create(); - knl_u_parallel_context *cxt = (knl_u_parallel_context *)pcxt->seg; - MemoryContext oldcontext = MemoryContextSwitchTo(cxt->memCtx); + MemoryContext oldcontext = NULL; + knl_u_parallel_context *cxt = NULL; + if (pcxt->seg != NULL) { + cxt = (knl_u_parallel_context *)pcxt->seg; + oldcontext = MemoryContextSwitchTo(cxt->memCtx); + } else { + /* DSM segment is full, use backend private memory, set worker to 0, fallback to serial query */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + pcxt->nworkers = 0; + pcxt->seg = palloc(sizeof(knl_u_parallel_context)); + cxt = (knl_u_parallel_context *)pcxt->seg; + cxt->memCtx = TopMemoryContext; + cxt->pwCtx = (ParallelInfoContext*)palloc0(sizeof(ParallelInfoContext)); + cxt->used = true; + slist_init(&cxt->on_detach); + } /* Initialize fixed-size state in shared memory. */ cxt->pwCtx->database_id = u_sess->proc_cxt.MyDatabaseId; @@ -644,7 +658,7 @@ void DestroyParallelContext(ParallelContext *pcxt) * stored there. */ if (pcxt->seg != NULL) { - dsm_detach(&(pcxt->seg)); + dsm_detach(&(pcxt->seg), pcxt->nworkers > 0 ? true : false); pcxt->seg = NULL; } @@ -1052,6 +1066,7 @@ void ParallelWorkerMain(Datum main_arg) /* Report success. */ pq_putmessage('X', NULL, 0); + pq_stop_redirect_to_shm_mq(); } /* diff --git a/src/gausskernel/storage/ipc/dsm.cpp b/src/gausskernel/storage/ipc/dsm.cpp index 58df80ac9..5a905d60c 100644 --- a/src/gausskernel/storage/ipc/dsm.cpp +++ b/src/gausskernel/storage/ipc/dsm.cpp @@ -31,6 +31,13 @@ #include "utils/memutils.h" #include "postmaster/bgworker_internals.h" +/* Backend-local tracking for on-detach callbacks. */ +typedef struct __dsm_segment_detach_callback { + on_dsm_detach_callback function; + Datum arg; + slist_node node; +} dsm_segment_detach_callback; + #ifdef __USE_NUMA static void RestoreCpuAffinity(cpu_set_t *cpuset) { @@ -44,17 +51,44 @@ static void RestoreCpuAffinity(cpu_set_t *cpuset) } #endif -void dsm_detach(void **seg) +/* + * newMemCtx means whether the memctx is new created or not. + * When newMemCtx is true, we just need to call pfree to free the mem, + * or we can call MemoryContextDelete to jsut delete the whole new created + * memctx. + */ +void dsm_detach(void **seg, bool newMemCtx) { + Assert(seg != NULL); Assert(*seg != NULL); knl_u_parallel_context *ctx = (knl_u_parallel_context *)*seg; + /* + * Invoke registered callbacks. Just in case one of those callbacks + * throws a further error that brings us back here, pop the callback + * before invoking it, to avoid infinite error recursion. + */ + while (!slist_is_empty(&ctx->on_detach)) { + slist_node *node = slist_pop_head_node(&ctx->on_detach); + dsm_segment_detach_callback *cb = slist_container(dsm_segment_detach_callback, node, node); + on_dsm_detach_callback function = cb->function; + Datum arg = cb->arg; + pfree(cb); + + function(seg, arg); + } + + if (newMemCtx) { #ifdef __USE_NUMA - RestoreCpuAffinity(ctx->pwCtx->cpuset); + RestoreCpuAffinity(ctx->pwCtx->cpuset); #endif - MemoryContextDelete(ctx->memCtx); - ctx->memCtx = NULL; - ctx->pwCtx = NULL; - ctx->used = false; + MemoryContextDelete(ctx->memCtx); + ctx->memCtx = NULL; + ctx->pwCtx = NULL; + ctx->used = false; + } else { + pfree(ctx->pwCtx); + pfree(ctx); + } } void *dsm_create(void) @@ -69,11 +103,24 @@ void *dsm_create(void) (void)MemoryContextSwitchTo(oldContext); u_sess->parallel_ctx[i].used = true; + slist_init(&u_sess->parallel_ctx[i].on_detach); return &(u_sess->parallel_ctx[i]); } } - ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES), errmsg("too many dynamic shared memory segments"))); + ereport(WARNING, (errmsg("too many dynamic shared memory segments"))); return NULL; } +/* + * Register an on-detach callback for a dynamic shared memory segment. + */ +void on_dsm_detach(void *seg, on_dsm_detach_callback function, Datum arg) +{ + dsm_segment_detach_callback *cb = + (dsm_segment_detach_callback*)MemoryContextAlloc(TopMemoryContext, sizeof(dsm_segment_detach_callback)); + cb->function = function; + cb->arg = arg; + knl_u_parallel_context *ctx = (knl_u_parallel_context *)seg; + slist_push_head(&ctx->on_detach, &cb->node); +} diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 947d57067..3a55efe58 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -1,38 +1,41 @@ -/* -------------------------------------------------------------------- - * execParallel.h - * POSTGRES parallel execution interface - * - * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * IDENTIFICATION - * src/include/executor/execParallel.h - * -------------------------------------------------------------------- - */ - -#ifndef EXECPARALLEL_H -#define EXECPARALLEL_H - -#include "access/parallel.h" -#include "nodes/execnodes.h" -#include "nodes/parsenodes.h" -#include "nodes/plannodes.h" - -typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; - -typedef struct ParallelExecutorInfo { - PlanState *planstate; - ParallelContext *pcxt; - BufferUsage *buffer_usage; - SharedExecutorInstrumentation *instrumentation; - shm_mq_handle **tqueue; - bool finished; -} ParallelExecutorInfo; - -extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); -extern void ExecParallelFinish(ParallelExecutorInfo *pei); -extern void ExecParallelCleanup(ParallelExecutorInfo *pei); -extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); - -extern void ParallelQueryMain(void *seg); -#endif /* EXECPARALLEL_H */ +/* -------------------------------------------------------------------- + * execParallel.h + * POSTGRES parallel execution interface + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execParallel.h + * -------------------------------------------------------------------- + */ + +#ifndef EXECPARALLEL_H +#define EXECPARALLEL_H + +#include "access/parallel.h" +#include "nodes/execnodes.h" +#include "nodes/parsenodes.h" +#include "nodes/plannodes.h" + +typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; + +typedef struct ParallelExecutorInfo { + PlanState *planstate; /* plan subtree we're running in parallel */ + ParallelContext *pcxt; /* parallel context we're using */ + BufferUsage *buffer_usage; /* points to bufusage area in DSM */ + SharedExecutorInstrumentation *instrumentation; /* optional */ + bool finished; /* set true by ExecParallelFinish */ + /* These two arrays have pcxt->nworkers_launched entries: */ + shm_mq_handle **tqueue; /* tuple queues for worker output */ + struct TupleQueueReader **reader; /* tuple reader/writer support */ +} ParallelExecutorInfo; + +extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers); +extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei, TupleDesc tupDesc); +extern void ExecParallelFinish(ParallelExecutorInfo *pei); +extern void ExecParallelCleanup(ParallelExecutorInfo *pei); +extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); + +extern void ParallelQueryMain(void *seg); +#endif /* EXECPARALLEL_H */ diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index aacf84b43..ef472f1c6 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2046,6 +2046,26 @@ typedef struct knl_u_ext_fdw_context { /* Info need to pass from leader to worker */ struct ParallelHeapScanDescData; typedef uint64 XLogRecPtr; +typedef struct ParallelQueryInfo { + struct SharedExecutorInstrumentation *instrumentation; + BufferUsage *bufUsage; + char *tupleQueue; + char *pstmt_space; + char *param_space; + Size param_len; + int pscan_num; + ParallelHeapScanDescData **pscan; +} ParallelQueryInfo; + +struct BTShared; +struct SharedSort; +typedef struct ParallelBtreeInfo { + char *queryText; + BTShared *btShared; + SharedSort *sharedSort; + SharedSort *sharedSort2; +} ParallelBtreeInfo; + typedef struct ParallelInfoContext { Oid database_id; Oid authenticated_user_id; @@ -2060,11 +2080,6 @@ typedef struct ParallelInfoContext { BackendId parallel_master_backend_id; TimestampTz xact_ts; TimestampTz stmt_ts; - char *pstmt_space; - char *param_space; - Size param_len; - int pscan_num; - ParallelHeapScanDescData **pscan; int usedComboCids; struct ComboCidKeyData *comboCids; char *tsnapspace; @@ -2092,14 +2107,17 @@ typedef struct ParallelInfoContext { TransactionId *ParallelCurrentXids; char *library_name; char *function_name; - BufferUsage *bufUsage; - char *tupleQueue; - struct SharedExecutorInstrumentation *instrumentation; char *namespace_search_path; #ifdef __USE_NUMA int numaNode; cpu_set_t *cpuset; #endif + + union { + ParallelQueryInfo queryInfo; /* parameters for parallel query only */ + ParallelBtreeInfo btreeInfo; /* parameters for parallel create index(btree) only */ + }; + /* Mutex protects remaining fields. */ slock_t mutex; /* Maximum XactLastRecEnd of any worker. */ @@ -2108,8 +2126,9 @@ typedef struct ParallelInfoContext { typedef struct knl_u_parallel_context { ParallelInfoContext *pwCtx; - MemoryContext memCtx; - bool used; + MemoryContext memCtx; /* memory context used to malloc memory */ + slist_head on_detach; /* On-detach callbacks. */ + bool used; /* used or not */ } knl_u_parallel_context; enum knl_session_status { diff --git a/src/include/storage/dsm.h b/src/include/storage/dsm.h index f07111d78..c23a38bb5 100644 --- a/src/include/storage/dsm.h +++ b/src/include/storage/dsm.h @@ -1,48 +1,47 @@ -/* ------------------------------------------------------------------------- - * - * dsm.h - * manage dynamic shared memory segments - * - * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd - * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * src/include/storage/dsm.h - * - * ------------------------------------------------------------------------- - */ -#ifndef DSM_H -#define DSM_H - -#define DSM_MAX_ITEM_PER_QUERY 8 - -/* Startup and shutdown functions. */ -#define dsm_cleanup_using_control_segment(oldControlHandle) -#define dsm_postmaster_startup(shmemHeader) -#define dsm_backend_shutdown -#define dsm_detach_all -#define dsm_set_control_handle(dsmHandle) - -/* Functions that create or remove mappings. */ -extern void *dsm_create(void); -#define dsm_attach(dsmHandle) -extern void dsm_detach(void **seg); - -/* Resource management functions. */ -#define dsm_pin_mapping(dsmSegment) -#define dsm_unpin_mapping(dsmSegment) -#define dsm_pin_segment(dsmSegment) -#define dsm_unpin_segment(dsmHandle) -#define dsm_find_mapping(dsmHandle) - -/* Informational functions. */ -#define dsm_segment_address(dsmSegment) -#define dsm_segment_map_length(dsmSegment) -#define dsm_segment_handle(dsmSegment) - -/* Cleanup hooks. */ -#define on_dsm_detach(dsmSegment, callbackFunc, arg) -#define cancel_on_dsm_detach(dsmSegment, callbackFunc, arg) -#define reset_on_dsm_detach - +/* ------------------------------------------------------------------------- + * + * dsm.h + * manage dynamic shared memory segments + * + * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/dsm.h + * + * ------------------------------------------------------------------------- + */ +#ifndef DSM_H +#define DSM_H + +#define DSM_MAX_ITEM_PER_QUERY 8 + +/* Startup and shutdown functions. */ +#define dsm_cleanup_using_control_segment(oldControlHandle) +#define dsm_postmaster_startup(shmemHeader) +#define dsm_backend_shutdown +#define dsm_detach_all +#define dsm_set_control_handle(dsmHandle) + +/* Functions that create or remove mappings. */ +extern void *dsm_create(void); +#define dsm_attach(dsmHandle) +extern void dsm_detach(void **seg, bool newMemCtx); + +/* Resource management functions. */ +#define dsm_pin_mapping(dsmSegment) +#define dsm_unpin_mapping(dsmSegment) +#define dsm_pin_segment(dsmSegment) +#define dsm_unpin_segment(dsmHandle) +#define dsm_find_mapping(dsmHandle) + +/* Informational functions. */ +#define dsm_segment_address(dsmSegment) +#define dsm_segment_map_length(dsmSegment) +#define dsm_segment_handle(dsmSegment) + +/* Cleanup hooks. */ +typedef void (*on_dsm_detach_callback) (void *seg, Datum arg); +extern void on_dsm_detach(void *seg, on_dsm_detach_callback function, Datum arg); + #endif /* DSM_H */ \ No newline at end of file