diff --git a/src/common/backend/utils/resowner/resowner.cpp b/src/common/backend/utils/resowner/resowner.cpp index 333e9e283..dd62b1d97 100755 --- a/src/common/backend/utils/resowner/resowner.cpp +++ b/src/common/backend/utils/resowner/resowner.cpp @@ -39,7 +39,9 @@ #include "catalog/pg_hashbucket_fn.h" /* - * ResourceOwner objects look like this + * ResourceOwner objects look like this. When tracking new types of resource, + * you must at least add the 'Remember' interface for that resource and adapt + * the 'ResourceOwnerConcat' function. */ typedef struct ResourceOwnerData { ResourceOwner parent; /* NULL if no parent (toplevel owner) */ @@ -542,6 +544,174 @@ void ResourceOwnerDelete(ResourceOwner owner) ResourceOwnerFreeOwner(owner, true); } +/* + * Part 1 of 'ResourceOwnerConcat'. Concatenate the top 12 resources of two owners. + */ +static void ResourceOwnerConcatPart1(ResourceOwner target, ResourceOwner source) +{ + int i; + + for (i = 0; i < source->nbuffers; i++) { + ResourceOwnerEnlargeBuffers(target); + ResourceOwnerRememberBuffer(target, source->buffers[i]); + } + + for (i = 0; i < source->nlocalcatclist; i++) { + ResourceOwnerEnlargeLocalCatCList(target); + ResourceOwnerRememberLocalCatCList(target, source->localcatclists[i]); + } + + for (i = 0; i < source->nlocalcatctup; i++) { + ResourceOwnerEnlargeLocalCatCTup(target); + ResourceOwnerRememberLocalCatCTup(target, source->localcatctups[i]); + } + + for (i = 0; i < source->nglobalcatctup; i++) { + ResourceOwnerEnlargeGlobalCatCTup(target); + ResourceOwnerRememberGlobalCatCTup(target, source->globalcatctups[i]); + } + + for (i = 0; i < source->nglobalcatclist; i++) { + ResourceOwnerEnlargeGlobalCatCList(target); + ResourceOwnerRememberGlobalCatCList(target, source->globalcatclists[i]); + } + + for (i = 0; i < source->nglobalbaseentry; i++) { + ResourceOwnerEnlargeGlobalBaseEntry(target); + ResourceOwnerRememberGlobalBaseEntry(target, source->globalbaseentries[i]); + } + + for (i = 0; i < source->nglobaldbentry; i++) { + ResourceOwnerEnlargeGlobalDBEntry(target); + ResourceOwnerRememberGlobalDBEntry(target, source->globaldbentries[i]); + } + + for (i = 0; i < source->nglobalisexclusive; i++) { + ResourceOwnerEnlargeGlobalIsExclusive(target); + ResourceOwnerRememberGlobalIsExclusive(target, source->globalisexclusives[i]); + } + + for (i = 0; i < source->ncatrefs; i++) { + ResourceOwnerEnlargeCatCacheRefs(target); + ResourceOwnerRememberCatCacheRef(target, source->catrefs[i]); + } + + for (i = 0; i < source->ncatlistrefs; i++) { + ResourceOwnerEnlargeCatCacheListRefs(target); + ResourceOwnerRememberCatCacheListRef(target, source->catlistrefs[i]); + } + + for (i = 0; i < source->nrelrefs; i++) { + ResourceOwnerEnlargeRelationRefs(target); + ResourceOwnerRememberRelationRef(target, source->relrefs[i]); + } + + for (i = 0; i < source->npartrefs; i++) { + ResourceOwnerEnlargePartitionRefs(target); + ResourceOwnerRememberPartitionRef(target, source->partrefs[i]); + } +} + +/* + * Part 2 of 'ResourceOwnerConcat'. Concatenate the remaining resources of two owners. + */ +static void ResourceOwnerConcatPart2(ResourceOwner target, ResourceOwner source) +{ + int i; + + for (i = 0; i < source->nfakerelrefs; i++) { + dlist_push_tail(&(target->fakerelrefs_list), dlist_pop_head_node(&(source->fakerelrefs_list))); + target->nfakerelrefs++; + } + + for (i = 0; i < source->nfakepartrefs; i++) { + ResourceOwnerEnlargeFakepartRefs(target); + ResourceOwnerRememberFakepartRef(target, source->fakepartrefs[i]); + } + + for (i = 0; i < source->nplanrefs; i++) { + ResourceOwnerEnlargePlanCacheRefs(target); + ResourceOwnerRememberPlanCacheRef(target, source->planrefs[i]); + } + + for (i = 0; i < source->ntupdescs; i++) { + ResourceOwnerEnlargeTupleDescs(target); + ResourceOwnerRememberTupleDesc(target, source->tupdescs[i]); + } + + for (i = 0; i < source->nsnapshots; i++) { + ResourceOwnerEnlargeSnapshots(target); + ResourceOwnerRememberSnapshot(target, source->snapshots[i]); + } + + for (i = 0; i < source->nfiles; i++) { + ResourceOwnerEnlargeFiles(target); + ResourceOwnerRememberFile(target, source->files[i]); + } + + for (i = 0; i < source->nDataCacheSlots; i++) { + ResourceOwnerEnlargeDataCacheSlot(target); + ResourceOwnerRememberDataCacheSlot(target, source->dataCacheSlots[i]); + } + + for (i = 0; i < source->nMetaCacheSlots; i++) { + ResourceOwnerEnlargeMetaCacheSlot(target); + ResourceOwnerRememberMetaCacheSlot(target, source->metaCacheSlots[i]); + } + + for (i = 0; i < source->nPthreadMutex; i++) { + ResourceOwnerEnlargePthreadMutex(target); + ResourceOwnerRememberPthreadMutex(target, source->pThdMutexs[i]); + } + + for (i = 0; i < source->nPthreadRWlock; i++) { + ResourceOwnerEnlargePthreadRWlock(target); + ResourceOwnerRememberPthreadRWlock(target, source->pThdRWlocks[i]); + } + + for (i = 0; i < source->npartmaprefs; i++) { + ResourceOwnerEnlargePartitionMapRefs(target); + ResourceOwnerRememberPartitionMapRef(target, source->partmaprefs[i]); + } + + for (i = 0; i < source->nglobalMemContext; i++) { + ResourceOwnerEnlargeGMemContext(target); + ResourceOwnerRememberGMemContext(target, source->globalMemContexts[i]); + } +} + +/* ResourceOwnerConcat + * Concatenate two owners. + * + * The resources traced by the 'source' are placed in the 'target' for tracing. + * The advantage is that the memory occupied by the 'source' owner can be released + * to reduce the memory consumed by tracing resources. When using a stream-plan, + * this is useful for preventing "memory is temporarily unavailable" error when + * executing a large number of SQLs in a single transaction/procedure. + * + * Note: After the invoking is complete, the memory of the 'source' should be release. + */ +void ResourceOwnerConcat(ResourceOwner target, ResourceOwner source) +{ + Assert(target && source); + /* + * When modifying the structure of ResourceOwnerData, note that the ResourceOwnerConcat + * function needs to be adapted when tracing new types of resources. + */ + Assert(sizeof(ResourceOwnerData) == 448); /* The current size of ResourceOwnerData is 448 */ + + while (source->firstchild != NULL) { + ResourceOwnerConcat(target, source->firstchild); + } + + /* + * ResourceOwner traces too many resources. To reduce cyclomatic complexity, + * the Concatenate operation is divided into two parts. + */ + ResourceOwnerConcatPart1(target, source); + ResourceOwnerConcatPart2(target, source); +} + /* * Fetch parent of a ResourceOwner (returns NULL if top-level owner) */ @@ -575,6 +745,14 @@ ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner) return owner->firstchild; } +/* + * Fetch memory context of a ResourceOwner + */ +MemoryContext ResourceOwnerGetMemCxt(ResourceOwner owner) +{ + return owner->memCxt; +} + /* * Reassign a ResourceOwner to have a new parent */ diff --git a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp index 7e8d8a0b5..1f454fc17 100755 --- a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp +++ b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp @@ -67,6 +67,43 @@ #include "utils/fmgroids.h" #include "access/heapam.h" +/* + * Check whether the current statement supports Stream based on the status of 'context' and 'query'. + * If Stream is supported, a copy of the 'query' is returned as a backup in case generating a plan + * with Stream fails. + */ +static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context) +{ + if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) { + *stream_unsupport = !context->query_shippable; + } else { + *stream_unsupport = !context->global_shippable; + } + + if (u_sess->attr.attr_sql.enable_dngather) { + u_sess->opt_cxt.is_dngather_support = !context->disable_dn_gather; + } else { + u_sess->opt_cxt.is_dngather_support = false; + } + + /* single node do not support parallel query in cursor */ + if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) { + *stream_unsupport = true; + } + + if (*stream_unsupport || !IS_STREAM) { + output_unshipped_log(); + set_stream_off(); + } else { + /* + * make a copy of query, so we can retry to create an unshippable plan + * when we fail to generate a stream plan + */ + return (Query*)copyObject(query); + } + return NULL; +} + PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundParams) { PlannedStmt* result = NULL; @@ -91,33 +128,7 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa (void)stream_walker((Node*)query, (void*)(&context)); disable_unshipped_log(query, &context); - if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) { - stream_unsupport = !context.query_shippable; - } else { - stream_unsupport = !context.global_shippable; - } - - if (u_sess->attr.attr_sql.enable_dngather) { - u_sess->opt_cxt.is_dngather_support = !context.disable_dn_gather; - } else { - u_sess->opt_cxt.is_dngather_support = false; - } - - /* single node do not support parallel query in cursor */ - if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) { - stream_unsupport = true; - } - - if (stream_unsupport || !IS_STREAM) { - output_unshipped_log(); - set_stream_off(); - } else { - /* - * make a copy of query, so we can retry to create an unshippable plan - * when we fail to generate a stream plan - */ - re_query = (Query*)copyObject(query); - } + re_query = check_shippable(&stream_unsupport, query, &context); } else { errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason, NOTPLANSHIPPING_LENGTH, @@ -141,9 +152,26 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa */ MemoryContext current_context = CurrentMemoryContext; ResourceOwner currentOwner = t_thrd.utils_cxt.CurrentResourceOwner; - ResourceOwner tempOwner = ResourceOwnerCreate(t_thrd.utils_cxt.CurrentResourceOwner, "pgxc_planner", - SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_OPTIMIZER)); - t_thrd.utils_cxt.CurrentResourceOwner = tempOwner; + ResourceOwner tempOwner = NULL; + /* + * If the stream-plan is not used, the currentOwner is used to trace resources instead of + * applying for a temporary owner. This prevents the "memory temporarily unavailable" error + * caused by memory stacking. + */ + if (IS_STREAM_PLAN) { + /* + * If the stream-plan is used, a temporary owner is used to trace resources. This helps release + * resources in a unified manner when a stream-plan fails to be generated, preventing resource + * leakage. + */ + tempOwner = ResourceOwnerCreate(t_thrd.utils_cxt.CurrentResourceOwner, "pgxc_planner", + /* + * The memory context of the temporary owner must be the same as the currentOwner to ensure + * that they have the same lifecycle + */ + ResourceOwnerGetMemCxt(currentOwner)); + t_thrd.utils_cxt.CurrentResourceOwner = tempOwner; + } /* we need Coordinator for evaluation, invoke standard planner */ PG_TRY(); @@ -175,8 +203,19 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa result->ng_use_planA = use_planA; ReSetNgQueryMem(result); } - /* release resource applied in standard_planner */ - t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; + /* If tempOwner is not NULL, the current plan is a stream-plan using the SMP technology. */ + if (tempOwner != NULL) { + /* + * When the stream-plan is successfully generated, the temporary owner tracks the + * resources opened during the plan generation. Now we put the resources of the + * stream-plan into the currentOwner for tracking, and release the tempOwner to + * further reduce the memory. This greatly avoid the "memory temporarily unavailable" + * error, caused by a large amount of SQLs being executed in a transaction/procedure. + */ + ResourceOwnerConcat(currentOwner, tempOwner); + t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; + ResourceOwnerDelete(tempOwner); + } } PG_CATCH(); { @@ -198,11 +237,13 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa * Release resources applied in standard_planner, release the tempOwner and reinstate the currentOwner * before PG_RE_THROW(). */ - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false); - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false); - t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; - ResourceOwnerDelete(tempOwner); + if (tempOwner != NULL) { + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false); + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false); + t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; + ResourceOwnerDelete(tempOwner); + } MemoryContextSwitchTo(ecxt); PG_RE_THROW(); } @@ -216,11 +257,13 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa } /* release resource applied in standard_planner of the PG_TRY. */ - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false); - ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false); - t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; - ResourceOwnerDelete(tempOwner); + if (tempOwner != NULL) { + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false); + ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false); + t_thrd.utils_cxt.CurrentResourceOwner = currentOwner; + ResourceOwnerDelete(tempOwner); + } #ifdef STREAMPLAN if (OidIsValid(lc_replan_nodegroup)) { diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h index 14c8fa26e..306d82485 100755 --- a/src/include/utils/resowner.h +++ b/src/include/utils/resowner.h @@ -64,10 +64,12 @@ typedef void (*ResourceReleaseCallback)(ResourceReleasePhase phase, bool isCommi extern ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char* name, MemoryContext memCxt); extern void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel); extern void ResourceOwnerDelete(ResourceOwner owner); +extern void ResourceOwnerConcat(ResourceOwner target, ResourceOwner source); extern ResourceOwner ResourceOwnerGetParent(ResourceOwner owner); extern ResourceOwner ResourceOwnerGetNextChild(ResourceOwner owner); extern const char * ResourceOwnerGetName(ResourceOwner owner); extern ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner); +extern MemoryContext ResourceOwnerGetMemCxt(ResourceOwner owner); extern void ResourceOwnerNewParent(ResourceOwner owner, ResourceOwner newparent); /* support for buffer refcount management */