The resourceOwner of the SMP is optimized.

Offering: openGaussDev

More detail:
  1. If the SMP is not used, no memory is applied for the tempOwner.
  2. If SMP is used, after the plan is successful, 
     CurrentResourceOwner to reduce the memory usage.

Match-id-8d0c6a3065d99d76fe103c0f82dc97c93e14dfd2
This commit is contained in:
openGaussDev
2022-03-03 15:36:34 +08:00
committed by yanghao
parent ea8d14f7c4
commit d885776b8f
3 changed files with 266 additions and 43 deletions

View File

@ -39,7 +39,9 @@
#include "catalog/pg_hashbucket_fn.h"
/*
* ResourceOwner objects look like this
* ResourceOwner objects look like this. When tracking new types of resource,
* you must at least add the 'Remember' interface for that resource and adapt
* the 'ResourceOwnerConcat' function.
*/
typedef struct ResourceOwnerData {
ResourceOwner parent; /* NULL if no parent (toplevel owner) */
@ -542,6 +544,174 @@ void ResourceOwnerDelete(ResourceOwner owner)
ResourceOwnerFreeOwner(owner, true);
}
/*
* Part 1 of 'ResourceOwnerConcat'. Concatenate the top 12 resources of two owners.
*/
static void ResourceOwnerConcatPart1(ResourceOwner target, ResourceOwner source)
{
int i;
for (i = 0; i < source->nbuffers; i++) {
ResourceOwnerEnlargeBuffers(target);
ResourceOwnerRememberBuffer(target, source->buffers[i]);
}
for (i = 0; i < source->nlocalcatclist; i++) {
ResourceOwnerEnlargeLocalCatCList(target);
ResourceOwnerRememberLocalCatCList(target, source->localcatclists[i]);
}
for (i = 0; i < source->nlocalcatctup; i++) {
ResourceOwnerEnlargeLocalCatCTup(target);
ResourceOwnerRememberLocalCatCTup(target, source->localcatctups[i]);
}
for (i = 0; i < source->nglobalcatctup; i++) {
ResourceOwnerEnlargeGlobalCatCTup(target);
ResourceOwnerRememberGlobalCatCTup(target, source->globalcatctups[i]);
}
for (i = 0; i < source->nglobalcatclist; i++) {
ResourceOwnerEnlargeGlobalCatCList(target);
ResourceOwnerRememberGlobalCatCList(target, source->globalcatclists[i]);
}
for (i = 0; i < source->nglobalbaseentry; i++) {
ResourceOwnerEnlargeGlobalBaseEntry(target);
ResourceOwnerRememberGlobalBaseEntry(target, source->globalbaseentries[i]);
}
for (i = 0; i < source->nglobaldbentry; i++) {
ResourceOwnerEnlargeGlobalDBEntry(target);
ResourceOwnerRememberGlobalDBEntry(target, source->globaldbentries[i]);
}
for (i = 0; i < source->nglobalisexclusive; i++) {
ResourceOwnerEnlargeGlobalIsExclusive(target);
ResourceOwnerRememberGlobalIsExclusive(target, source->globalisexclusives[i]);
}
for (i = 0; i < source->ncatrefs; i++) {
ResourceOwnerEnlargeCatCacheRefs(target);
ResourceOwnerRememberCatCacheRef(target, source->catrefs[i]);
}
for (i = 0; i < source->ncatlistrefs; i++) {
ResourceOwnerEnlargeCatCacheListRefs(target);
ResourceOwnerRememberCatCacheListRef(target, source->catlistrefs[i]);
}
for (i = 0; i < source->nrelrefs; i++) {
ResourceOwnerEnlargeRelationRefs(target);
ResourceOwnerRememberRelationRef(target, source->relrefs[i]);
}
for (i = 0; i < source->npartrefs; i++) {
ResourceOwnerEnlargePartitionRefs(target);
ResourceOwnerRememberPartitionRef(target, source->partrefs[i]);
}
}
/*
* Part 2 of 'ResourceOwnerConcat'. Concatenate the remaining resources of two owners.
*/
static void ResourceOwnerConcatPart2(ResourceOwner target, ResourceOwner source)
{
int i;
for (i = 0; i < source->nfakerelrefs; i++) {
dlist_push_tail(&(target->fakerelrefs_list), dlist_pop_head_node(&(source->fakerelrefs_list)));
target->nfakerelrefs++;
}
for (i = 0; i < source->nfakepartrefs; i++) {
ResourceOwnerEnlargeFakepartRefs(target);
ResourceOwnerRememberFakepartRef(target, source->fakepartrefs[i]);
}
for (i = 0; i < source->nplanrefs; i++) {
ResourceOwnerEnlargePlanCacheRefs(target);
ResourceOwnerRememberPlanCacheRef(target, source->planrefs[i]);
}
for (i = 0; i < source->ntupdescs; i++) {
ResourceOwnerEnlargeTupleDescs(target);
ResourceOwnerRememberTupleDesc(target, source->tupdescs[i]);
}
for (i = 0; i < source->nsnapshots; i++) {
ResourceOwnerEnlargeSnapshots(target);
ResourceOwnerRememberSnapshot(target, source->snapshots[i]);
}
for (i = 0; i < source->nfiles; i++) {
ResourceOwnerEnlargeFiles(target);
ResourceOwnerRememberFile(target, source->files[i]);
}
for (i = 0; i < source->nDataCacheSlots; i++) {
ResourceOwnerEnlargeDataCacheSlot(target);
ResourceOwnerRememberDataCacheSlot(target, source->dataCacheSlots[i]);
}
for (i = 0; i < source->nMetaCacheSlots; i++) {
ResourceOwnerEnlargeMetaCacheSlot(target);
ResourceOwnerRememberMetaCacheSlot(target, source->metaCacheSlots[i]);
}
for (i = 0; i < source->nPthreadMutex; i++) {
ResourceOwnerEnlargePthreadMutex(target);
ResourceOwnerRememberPthreadMutex(target, source->pThdMutexs[i]);
}
for (i = 0; i < source->nPthreadRWlock; i++) {
ResourceOwnerEnlargePthreadRWlock(target);
ResourceOwnerRememberPthreadRWlock(target, source->pThdRWlocks[i]);
}
for (i = 0; i < source->npartmaprefs; i++) {
ResourceOwnerEnlargePartitionMapRefs(target);
ResourceOwnerRememberPartitionMapRef(target, source->partmaprefs[i]);
}
for (i = 0; i < source->nglobalMemContext; i++) {
ResourceOwnerEnlargeGMemContext(target);
ResourceOwnerRememberGMemContext(target, source->globalMemContexts[i]);
}
}
/* ResourceOwnerConcat
* Concatenate two owners.
*
* The resources traced by the 'source' are placed in the 'target' for tracing.
* The advantage is that the memory occupied by the 'source' owner can be released
* to reduce the memory consumed by tracing resources. When using a stream-plan,
* this is useful for preventing "memory is temporarily unavailable" error when
* executing a large number of SQLs in a single transaction/procedure.
*
* Note: After the invoking is complete, the memory of the 'source' should be release.
*/
void ResourceOwnerConcat(ResourceOwner target, ResourceOwner source)
{
Assert(target && source);
/*
* When modifying the structure of ResourceOwnerData, note that the ResourceOwnerConcat
* function needs to be adapted when tracing new types of resources.
*/
Assert(sizeof(ResourceOwnerData) == 448); /* The current size of ResourceOwnerData is 448 */
while (source->firstchild != NULL) {
ResourceOwnerConcat(target, source->firstchild);
}
/*
* ResourceOwner traces too many resources. To reduce cyclomatic complexity,
* the Concatenate operation is divided into two parts.
*/
ResourceOwnerConcatPart1(target, source);
ResourceOwnerConcatPart2(target, source);
}
/*
* Fetch parent of a ResourceOwner (returns NULL if top-level owner)
*/
@ -575,6 +745,14 @@ ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner)
return owner->firstchild;
}
/*
* Fetch memory context of a ResourceOwner
*/
MemoryContext ResourceOwnerGetMemCxt(ResourceOwner owner)
{
return owner->memCxt;
}
/*
* Reassign a ResourceOwner to have a new parent
*/

View File

@ -67,6 +67,43 @@
#include "utils/fmgroids.h"
#include "access/heapam.h"
/*
* Check whether the current statement supports Stream based on the status of 'context' and 'query'.
* If Stream is supported, a copy of the 'query' is returned as a backup in case generating a plan
* with Stream fails.
*/
static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context)
{
if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) {
*stream_unsupport = !context->query_shippable;
} else {
*stream_unsupport = !context->global_shippable;
}
if (u_sess->attr.attr_sql.enable_dngather) {
u_sess->opt_cxt.is_dngather_support = !context->disable_dn_gather;
} else {
u_sess->opt_cxt.is_dngather_support = false;
}
/* single node do not support parallel query in cursor */
if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) {
*stream_unsupport = true;
}
if (*stream_unsupport || !IS_STREAM) {
output_unshipped_log();
set_stream_off();
} else {
/*
* make a copy of query, so we can retry to create an unshippable plan
* when we fail to generate a stream plan
*/
return (Query*)copyObject(query);
}
return NULL;
}
PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt* result = NULL;
@ -91,33 +128,7 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
(void)stream_walker((Node*)query, (void*)(&context));
disable_unshipped_log(query, &context);
if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) {
stream_unsupport = !context.query_shippable;
} else {
stream_unsupport = !context.global_shippable;
}
if (u_sess->attr.attr_sql.enable_dngather) {
u_sess->opt_cxt.is_dngather_support = !context.disable_dn_gather;
} else {
u_sess->opt_cxt.is_dngather_support = false;
}
/* single node do not support parallel query in cursor */
if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) {
stream_unsupport = true;
}
if (stream_unsupport || !IS_STREAM) {
output_unshipped_log();
set_stream_off();
} else {
/*
* make a copy of query, so we can retry to create an unshippable plan
* when we fail to generate a stream plan
*/
re_query = (Query*)copyObject(query);
}
re_query = check_shippable(&stream_unsupport, query, &context);
} else {
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
@ -141,9 +152,26 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
*/
MemoryContext current_context = CurrentMemoryContext;
ResourceOwner currentOwner = t_thrd.utils_cxt.CurrentResourceOwner;
ResourceOwner tempOwner = ResourceOwnerCreate(t_thrd.utils_cxt.CurrentResourceOwner, "pgxc_planner",
SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_OPTIMIZER));
t_thrd.utils_cxt.CurrentResourceOwner = tempOwner;
ResourceOwner tempOwner = NULL;
/*
* If the stream-plan is not used, the currentOwner is used to trace resources instead of
* applying for a temporary owner. This prevents the "memory temporarily unavailable" error
* caused by memory stacking.
*/
if (IS_STREAM_PLAN) {
/*
* If the stream-plan is used, a temporary owner is used to trace resources. This helps release
* resources in a unified manner when a stream-plan fails to be generated, preventing resource
* leakage.
*/
tempOwner = ResourceOwnerCreate(t_thrd.utils_cxt.CurrentResourceOwner, "pgxc_planner",
/*
* The memory context of the temporary owner must be the same as the currentOwner to ensure
* that they have the same lifecycle
*/
ResourceOwnerGetMemCxt(currentOwner));
t_thrd.utils_cxt.CurrentResourceOwner = tempOwner;
}
/* we need Coordinator for evaluation, invoke standard planner */
PG_TRY();
@ -175,8 +203,19 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
result->ng_use_planA = use_planA;
ReSetNgQueryMem(result);
}
/* release resource applied in standard_planner */
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
/* If tempOwner is not NULL, the current plan is a stream-plan using the SMP technology. */
if (tempOwner != NULL) {
/*
* When the stream-plan is successfully generated, the temporary owner tracks the
* resources opened during the plan generation. Now we put the resources of the
* stream-plan into the currentOwner for tracking, and release the tempOwner to
* further reduce the memory. This greatly avoid the "memory temporarily unavailable"
* error, caused by a large amount of SQLs being executed in a transaction/procedure.
*/
ResourceOwnerConcat(currentOwner, tempOwner);
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
ResourceOwnerDelete(tempOwner);
}
}
PG_CATCH();
{
@ -198,11 +237,13 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
* Release resources applied in standard_planner, release the tempOwner and reinstate the currentOwner
* before PG_RE_THROW().
*/
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false);
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
ResourceOwnerDelete(tempOwner);
if (tempOwner != NULL) {
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false);
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
ResourceOwnerDelete(tempOwner);
}
MemoryContextSwitchTo(ecxt);
PG_RE_THROW();
}
@ -216,11 +257,13 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
}
/* release resource applied in standard_planner of the PG_TRY. */
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false);
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
ResourceOwnerDelete(tempOwner);
if (tempOwner != NULL) {
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_LOCKS, false, false);
ResourceOwnerRelease(tempOwner, RESOURCE_RELEASE_AFTER_LOCKS, false, false);
t_thrd.utils_cxt.CurrentResourceOwner = currentOwner;
ResourceOwnerDelete(tempOwner);
}
#ifdef STREAMPLAN
if (OidIsValid(lc_replan_nodegroup)) {

View File

@ -64,10 +64,12 @@ typedef void (*ResourceReleaseCallback)(ResourceReleasePhase phase, bool isCommi
extern ResourceOwner ResourceOwnerCreate(ResourceOwner parent, const char* name, MemoryContext memCxt);
extern void ResourceOwnerRelease(ResourceOwner owner, ResourceReleasePhase phase, bool isCommit, bool isTopLevel);
extern void ResourceOwnerDelete(ResourceOwner owner);
extern void ResourceOwnerConcat(ResourceOwner target, ResourceOwner source);
extern ResourceOwner ResourceOwnerGetParent(ResourceOwner owner);
extern ResourceOwner ResourceOwnerGetNextChild(ResourceOwner owner);
extern const char * ResourceOwnerGetName(ResourceOwner owner);
extern ResourceOwner ResourceOwnerGetFirstChild(ResourceOwner owner);
extern MemoryContext ResourceOwnerGetMemCxt(ResourceOwner owner);
extern void ResourceOwnerNewParent(ResourceOwner owner, ResourceOwner newparent);
/* support for buffer refcount management */