return back IsStreamSupport

This commit is contained in:
TotaJ
2021-04-29 19:48:27 +08:00
parent 38d455d126
commit 4eeef6c1c1
2 changed files with 14 additions and 5 deletions

View File

@ -115,6 +115,15 @@ static bool check_stream_plan(Plan* plan);
static bool is_upsert_query_with_update_param(Node* raw_parse_tree);
static void GPCFillPlanCache(CachedPlanSource* plansource, bool isBuildingCustomPlan);
bool IsStreamSupport()
{
#ifdef ENABLE_MULTIPLE_NODES
return u_sess->attr.attr_sql.enable_stream_operator;
#else
return u_sess->opt_cxt.query_dop > 1;
#endif
}
/*
* InitPlanCache: initialize module during InitPostgres.
*
@ -242,7 +251,7 @@ CachedPlanSource* CreateCachedPlan(Node* raw_parse_tree, const char* query_strin
plansource->context = source_context;
#ifdef PGXC
plansource->stmt_name = (stmt_name ? pstrdup(stmt_name) : NULL);
plansource->stream_enabled = u_sess->attr.attr_sql.enable_stream_operator;
plansource->stream_enabled = IsStreamSupport();
plansource->cplan = NULL;
plansource->single_exec_node = NULL;
plansource->is_read_only = false;
@ -358,7 +367,7 @@ CachedPlanSource* CreateOneShotCachedPlan(Node* raw_parse_tree, const char* quer
#endif
#ifdef PGXC
plansource->stream_enabled = u_sess->attr.attr_sql.enable_stream_operator;
plansource->stream_enabled = IsStreamSupport();;
plansource->cplan = NULL;
plansource->single_exec_node = NULL;
plansource->is_read_only = false;
@ -1001,7 +1010,7 @@ static bool CheckCachedPlan(CachedPlanSource* plansource)
}
/* If stream_operator alreadly change, need build plan again.*/
if ((!plansource->gpc.status.InShareTable()) && plansource->stream_enabled != u_sess->attr.attr_sql.enable_stream_operator) {
if ((!plansource->gpc.status.InShareTable()) && plansource->stream_enabled != IsStreamSupport()) {
return false;
}
@ -1294,7 +1303,7 @@ static CachedPlan* BuildCachedPlan(CachedPlanSource* plansource, List* qlist, Pa
MemoryContextSwitchTo(oldcxt);
/* Set plan real u_sess->attr.attr_sql.enable_stream_operator.*/
plansource->stream_enabled = u_sess->attr.attr_sql.enable_stream_operator;
plansource->stream_enabled = IsStreamSupport();
//in shared hash table, we can not share the plan, we should throw error before this logic.
plan->is_share = false;

View File

@ -561,7 +561,7 @@ void GlobalPlanCache::RecreateCachePlan(CachedPlanSource* oldsource, const char*
GPC_LOG("recreate plan", oldsource, oldsource->stmt_name);
CachedPlanSource *newsource = CopyCachedPlan(oldsource, true);
MemoryContext oldcxt = MemoryContextSwitchTo(newsource->context);
newsource->stream_enabled = u_sess->attr.attr_sql.enable_stream_operator;
newsource->stream_enabled = IsStreamSupport();
u_sess->exec_cxt.CurrentOpFusionObj = NULL;
Assert (oldsource->gpc.status.IsSharePlan());
newsource->gpc.status.ShareInit();