From fe9b5d971e84cf07278ae385a6ae50a47bdf6536 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Thu, 1 Aug 2024 20:49:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=B9=B6=E8=A1=8C=E6=B8=B8?= =?UTF-8?q?=E6=A0=87=E7=9A=84=E8=8B=A5=E5=B9=B2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/nodes/copyfuncs.cpp | 8 +- src/common/backend/nodes/outfuncs.cpp | 8 +- src/common/backend/nodes/readfuncs.cpp | 12 +- src/common/backend/parser/parse_expr.cpp | 82 +++++++------ src/common/backend/parser/parse_relation.cpp | 2 +- src/common/backend/utils/misc/guc/guc_sql.cpp | 3 +- src/gausskernel/optimizer/plan/createplan.cpp | 91 +++++++++++---- .../optimizer/plan/streamplan_utils.cpp | 52 ++++++++- src/gausskernel/optimizer/plan/subselect.cpp | 2 + src/gausskernel/process/stream/execStream.cpp | 18 +-- .../process/threadpool/knl_session.cpp | 1 + src/gausskernel/runtime/executor/spi.cpp | 1 + src/include/executor/executor.h | 12 ++ src/include/knl/knl_session.h | 1 + src/include/nodes/plannodes.h | 2 + src/include/optimizer/stream_cost.h | 2 - src/include/parser/parse_expr.h | 2 +- .../regress/expected/cursor_expression.out | 63 ++++++++++ .../expected/parallel_enable_function.out | 109 ++++++++++++++++++ src/test/regress/expected/smp_cursor.out | 50 ++++++++ src/test/regress/sql/cursor_expression.sql | 23 ++++ .../regress/sql/parallel_enable_function.sql | 25 ++++ src/test/regress/sql/smp_cursor.sql | 19 +++ 23 files changed, 501 insertions(+), 87 deletions(-) diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index 1741812f2..b7c011adc 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -303,6 +303,10 @@ static void CopyPlanFields(const Plan* from, Plan* newnode) #ifdef USE_SPQ COPY_SCALAR_FIELD(spq_scan_partial); #endif + if (t_thrd.proc->workingVersionNum >= PARALLEL_ENABLE_VERSION_NUM) { + COPY_SCALAR_FIELD(cursor_expr_level); + COPY_SCALAR_FIELD(cursor_owner_node_id); + } newnode->rightRefState = CopyRightRefState(from->rightRefState); } @@ -2281,10 +2285,6 @@ static Stream* _copyStream(const Stream* from) #ifdef USE_SPQ COPY_SCALAR_FIELD(streamID); #endif - if (t_thrd.proc->workingVersionNum >= PARALLEL_ENABLE_VERSION_NUM) { - COPY_SCALAR_FIELD(cursor_expr_level); - COPY_SCALAR_FIELD(cursor_owner_node_id); - } return newnode; } diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index d59fb43fd..d5eef469f 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -716,6 +716,10 @@ static void _outPlanInfo(StringInfo str, Plan* node) WRITE_BOOL_FIELD(spq_scan_partial); } #endif + if (t_thrd.proc->workingVersionNum >= PARALLEL_ENABLE_VERSION_NUM) { + WRITE_INT_FIELD(cursor_expr_level); + WRITE_INT_FIELD(cursor_owner_node_id); + } } static void _outPruningResult(StringInfo str, PruningResult* node) @@ -1210,10 +1214,6 @@ static void _outStream(StringInfo str, Stream* node) #ifdef USE_SPQ WRITE_INT_FIELD(streamID); #endif - if (t_thrd.proc->workingVersionNum >= PARALLEL_ENABLE_VERSION_NUM) { - WRITE_INT_FIELD(cursor_expr_level); - WRITE_INT_FIELD(cursor_owner_node_id); - } } /* diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index 5b818c43b..2d0418894 100755 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -767,12 +767,6 @@ THR_LOCAL bool skip_read_extern_fields = false; READ_INT_FIELD(stream_level); \ READ_NODE_FIELD(origin_consumer_nodes); \ READ_BOOL_FIELD(is_recursive_local); \ - IF_EXIST(cursor_expr_level) { \ - READ_INT_FIELD(cursor_expr_level); \ - } \ - IF_EXIST(cursor_owner_node_id) { \ - READ_INT_FIELD(cursor_owner_node_id); \ - } \ READ_STREAM_ID(); \ \ READ_DONE(); \ @@ -3530,6 +3524,12 @@ static Plan* _readPlan(Plan* local_node) READ_BOOL_FIELD(spq_scan_partial); } #endif + IF_EXIST(cursor_expr_level) { + READ_INT_FIELD(cursor_expr_level); + } + IF_EXIST(cursor_owner_node_id) { + READ_INT_FIELD(cursor_owner_node_id); + } READ_DONE(); } diff --git a/src/common/backend/parser/parse_expr.cpp b/src/common/backend/parser/parse_expr.cpp index b85da6f35..b7b881b98 100644 --- a/src/common/backend/parser/parse_expr.cpp +++ b/src/common/backend/parser/parse_expr.cpp @@ -3894,6 +3894,7 @@ static Node* transformCursorExpression(ParseState* pstate, CursorExpression* cur CursorExpression* newm = makeNode(CursorExpression); char* queryString; List* raw_parsetree_list = NIL; + List* plantree_list = NIL; PlannedStmt* plan_tree; ListCell* raw_parsetree_cell = NULL; List* stmt_list = NIL; @@ -3903,6 +3904,7 @@ static Node* transformCursorExpression(ParseState* pstate, CursorExpression* cur if (!smp) { dopControl.CloseSmp(); + dopControl.UnderCursor(); } ParseState* parse_state_parent = pstate; @@ -3915,48 +3917,57 @@ static Node* transformCursorExpression(ParseState* pstate, CursorExpression* cur parse_state_temp = parse_state_temp->parentParseState; } - queryString = pstrdup(cursor_expression->raw_query_str); - raw_parsetree_list = pg_parse_query(queryString); - foreach (raw_parsetree_cell, raw_parsetree_list) { - Node* parsetree = (Node*)lfirst(raw_parsetree_cell); - List* querytree_list = pg_analyze_and_rewrite(parsetree, queryString, NULL, 0, parse_state_parent); - stmt_list = list_concat(stmt_list, querytree_list); - } + PG_TRY(); + { + queryString = pstrdup(cursor_expression->raw_query_str); + raw_parsetree_list = pg_parse_query(queryString); + foreach (raw_parsetree_cell, raw_parsetree_list) { + Node* parsetree = (Node*)lfirst(raw_parsetree_cell); + List* querytree_list = pg_analyze_and_rewrite(parsetree, queryString, NULL, 0, parse_state_parent); + stmt_list = list_concat(stmt_list, querytree_list); + } - Query* query = castNode(Query, linitial(stmt_list)); + plantree_list = pg_plan_queries(stmt_list, 0, NULL); - plan_tree = pg_plan_query(query, 0, NULL); - - if (IsA(plan_tree->planTree, Stream)) { - ((Stream*)plan_tree->planTree)->cursor_expr_level = level; + plan_tree = castNode(PlannedStmt, linitial(plantree_list)); + plan_tree->planTree->cursor_expr_level = level; /* reset cursor_expr_level */ if (level == 1) { u_sess->parser_cxt.cursor_expr_level = 0; } - } - int nParamExec = 0; - parse_state_temp = parse_state_parent; - if (parse_state_temp != NULL) { - nParamExec = list_length(parse_state_temp->cursor_expression_para_var); - } - - plan_tree->nParamExec = nParamExec; - newm->plan = (Node*)plan_tree; - newm->options = cursor_expression->options; - newm->raw_query_str = queryString; - newm->param = (List*)copyObject(parse_state_parent->cursor_expression_para_var); + int nParamExec = 0; + parse_state_temp = parse_state_parent; + if (parse_state_temp != NULL) { + nParamExec = list_length(parse_state_temp->cursor_expression_para_var); + } + + plan_tree->nParamExec = nParamExec; + newm->plan = (Node*)plan_tree; + newm->options = cursor_expression->options; + newm->raw_query_str = queryString; + newm->param = (List*)copyObject(parse_state_parent->cursor_expression_para_var); - if (pstate->p_pre_columnref_hook == NULL && pstate->p_post_columnref_hook == NULL && - pstate->p_expr_kind == EXPR_KIND_SELECT_TARGET && pstate->p_expr_transform_level == 1) { - newm->is_simple_select_target = true; - } else { - newm->is_simple_select_target = false; - } + if (pstate->p_pre_columnref_hook == NULL && pstate->p_post_columnref_hook == NULL && + pstate->p_expr_kind == EXPR_KIND_SELECT_TARGET && pstate->p_expr_transform_level == 1) { + newm->is_simple_select_target = true; + } else { + newm->is_simple_select_target = false; + } - list_free_ext(stmt_list); - list_free_ext(raw_parsetree_list); + list_free_ext(stmt_list); + list_free_ext(raw_parsetree_list); + } + PG_CATCH(); + { + u_sess->parser_cxt.cursor_expr_level = 0; + /* restore smp */ + dopControl.ResetSmp(); + + PG_RE_THROW(); + } + PG_END_TRY(); /* restore parent state */ parse_state_parent->transform_outer_columnref_as_param_hook = NULL; @@ -4145,7 +4156,7 @@ static Node *transformStartWithWhereClauseColumnRef(ParseState *pstate, ColumnRe return NULL; } -PlannedStmt* getCursorStreamFromFuncArg(FuncExpr* funcexpr) +PlannedStmt* getCursorStreamFromFuncArg(FuncExpr* funcexpr, CursorExpression** ce) { ListCell* lc = NULL; foreach (lc, funcexpr->args) { @@ -4153,7 +4164,10 @@ PlannedStmt* getCursorStreamFromFuncArg(FuncExpr* funcexpr) if (IsA(arg, CursorExpression)) { CursorExpression* cursorExpr = (CursorExpression*)arg; PlannedStmt* cursorPlan = (PlannedStmt*)cursorExpr->plan; - if (IsA(cursorPlan->planTree, Stream)) { + if (cursorPlan->num_streams > 0) { + if (ce != NULL) { + *ce = cursorExpr; + } return cursorPlan; } } diff --git a/src/common/backend/parser/parse_relation.cpp b/src/common/backend/parser/parse_relation.cpp index 28c578b0e..f92c68b20 100755 --- a/src/common/backend/parser/parse_relation.cpp +++ b/src/common/backend/parser/parse_relation.cpp @@ -1794,7 +1794,7 @@ RangeTblEntry* addRangeTableEntryForFunction( */ if (IsA(funcexpr, FuncExpr)) { PlannedStmt* cursorPstmt = getCursorStreamFromFuncArg((FuncExpr*)funcexpr); - if (cursorPstmt != NULL) { + if (cursorPstmt != NULL && IsA(cursorPstmt->planTree, Stream)) { rte->cursorDop = cursorPstmt->planTree->lefttree->dop; } } diff --git a/src/common/backend/utils/misc/guc/guc_sql.cpp b/src/common/backend/utils/misc/guc/guc_sql.cpp index eeb0c890f..7e4a4f608 100755 --- a/src/common/backend/utils/misc/guc/guc_sql.cpp +++ b/src/common/backend/utils/misc/guc/guc_sql.cpp @@ -3354,7 +3354,8 @@ static void AssignQueryDop(int newval, void* extra) #ifndef ENABLE_MULTIPLE_NODES /* do not reset backend threads tag */ if (u_sess->opt_cxt.query_dop > 1 && - (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) { + (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER) && + !u_sess->opt_cxt.is_under_cursor) { u_sess->opt_cxt.smp_enabled = true; } #endif diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index d39559fc9..26683e746 100755 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -6193,6 +6193,40 @@ static PartIterator* create_partIterator_plan( return partItr; } +PlannedStmt* ReBuildNonSmpPlanForCursorExpr(const char* queryString) +{ + List* raw_parsetree_list = NIL; + List* plantree_list = NIL; + List* stmt_list = NIL; + AutoDopControl dopControl; + ListCell* raw_parsetree_cell = NULL; + + dopControl.CloseSmp(); + dopControl.UnderCursor(); + + PG_TRY(); + { + raw_parsetree_list = pg_parse_query(queryString); + foreach (raw_parsetree_cell, raw_parsetree_list) { + Node* parsetree = (Node*)lfirst(raw_parsetree_cell); + List* querytree_list = pg_analyze_and_rewrite(parsetree, queryString, NULL, 0); + stmt_list = list_concat(stmt_list, querytree_list); + } + + plantree_list = pg_plan_queries(stmt_list, 0, NULL); + } + PG_CATCH(); + { + dopControl.ResetSmp(); + PG_RE_THROW(); + } + PG_END_TRY(); + /* restore smp */ + dopControl.ResetSmp(); + + return castNode(PlannedStmt, linitial(plantree_list)); +} + static FunctionScan* make_functionscan(List* qptlist, List* qpqual, Index scanrelid, Node* funcexpr, List* funccolnames, List* funccoltypes, List* funccoltypmods, List* funccolcollations) { @@ -6211,38 +6245,53 @@ static FunctionScan* make_functionscan(List* qptlist, List* qpqual, Index scanre node->funccoltypmods = funccoltypmods; node->funccolcollations = funccolcollations; + CursorExpression* ce = NULL; + PlannedStmt* cursorPstmt = getCursorStreamFromFuncArg((FuncExpr*)funcexpr, &ce); + if (cursorPstmt == NULL) { + return node; + } + if (IS_STREAM_PLAN && u_sess->opt_cxt.query_dop > 1) { FunctionPartitionStrategy strategy; List* partkey = NIL; strategy = GetParallelStrategyAndKey(((FuncExpr*)funcexpr)->funcid, &partkey); - PlannedStmt* cursorPstmt = getCursorStreamFromFuncArg((FuncExpr*)funcexpr); - if (cursorPstmt != NULL) { - Plan* cursorPlan = cursorPstmt->planTree; - Stream* stream = (Stream*)cursorPlan; + Plan* cursorPlan = cursorPstmt->planTree; - /* set plan->dop according to cursorplan */ - inherit_plan_locator_info(plan, cursorPlan->lefttree); - stream->smpDesc.consumerDop = plan->dop; + /* If top-plan is not stream, functionscan can not be parallel executed */ + if (!IsA(cursorPlan, Stream)) { + return node; + } - /* if FUNC_PARTITION_HASH is specified, set distributed_keys and distriType */ - if (strategy == FUNC_PARTITION_HASH && partkey != NIL) { - ListCell* lc1 = NULL; - foreach (lc1, cursorPlan->targetlist) { - TargetEntry* entry = (TargetEntry*)lfirst(lc1); - ListCell* lc2 = NULL; - foreach (lc2, partkey) { - if (strcmp(entry->resname, (char*)lfirst(lc2)) == 0) { - stream->distribute_keys = lappend(stream->distribute_keys, entry->expr); - break; - } + Stream* stream = (Stream*)cursorPlan; + + /* set plan->dop according to cursorplan */ + inherit_plan_locator_info(plan, cursorPlan->lefttree); + stream->smpDesc.consumerDop = plan->dop; + + /* if FUNC_PARTITION_HASH is specified, set distributed_keys and distriType */ + if (strategy == FUNC_PARTITION_HASH && partkey != NIL) { + ListCell* lc1 = NULL; + foreach (lc1, cursorPlan->targetlist) { + TargetEntry* entry = (TargetEntry*)lfirst(lc1); + ListCell* lc2 = NULL; + foreach (lc2, partkey) { + if (strcmp(entry->resname, (char*)lfirst(lc2)) == 0) { + stream->distribute_keys = lappend(stream->distribute_keys, entry->expr); + break; } } - plan->distributed_keys = stream->distribute_keys; - stream->smpDesc.distriType = list_length(plan->distributed_keys) > 0 ? - LOCAL_DISTRIBUTE : stream->smpDesc.distriType; } + plan->distributed_keys = stream->distribute_keys; + stream->smpDesc.distriType = list_length(plan->distributed_keys) > 0 ? + LOCAL_DISTRIBUTE : stream->smpDesc.distriType; } + } else { + /* + * if functionscan is disallowed to smp, and cursorPlan has stream node, + * rebuild non-smp plan. For example, subplan is not support smp. + */ + ce->plan = (Node*)ReBuildNonSmpPlanForCursorExpr(pstrdup(ce->raw_query_str)); } return node; diff --git a/src/gausskernel/optimizer/plan/streamplan_utils.cpp b/src/gausskernel/optimizer/plan/streamplan_utils.cpp index fa2f3fa70..50ca0f152 100755 --- a/src/gausskernel/optimizer/plan/streamplan_utils.cpp +++ b/src/gausskernel/optimizer/plan/streamplan_utils.cpp @@ -22,6 +22,7 @@ #include "optimizer/planner.h" #include "optimizer/restrictinfo.h" #include "optimizer/tlist.h" +#include "optimizer/planmem_walker.h" #include "parser/parse_collate.h" #include "parser/parse_coerce.h" #include "parser/parse_clause.h" @@ -1565,6 +1566,44 @@ static void set_bucketmap_index(Plan* plan, NodeGroupInfoContext* node_group_inf } } +typedef struct SetStreamPlanCursorWalkerContext { + MethodPlanWalkerContext mpwc; + + int cursor_expr_level; + int cursor_owner_node_id; +} SetStreamPlanCursorWalkerContext; + +bool set_stream_plan_cursor(Node* node_plan, void* context) +{ + if (node_plan == NULL) { + return false; + } + + if (IsA(node_plan, Stream)) { + Plan* plan = (Plan*)node_plan; + plan->cursor_expr_level = ((SetStreamPlanCursorWalkerContext*)context)->cursor_expr_level; + plan->cursor_owner_node_id = ((SetStreamPlanCursorWalkerContext*)context)->cursor_owner_node_id; + } + + return plan_tree_walker(node_plan, (MethodWalker)set_stream_plan_cursor, (void*)context); +} + +/* + * walk through plan tree to set cursor_expr_level/cursor_owner_node_id + */ +void set_stream_plan_cursor_walker(Plan* node_plan) +{ + SetStreamPlanCursorWalkerContext context; + errno_t rc = memset_s(&context, sizeof(SetStreamPlanCursorWalkerContext), 0, + sizeof(SetStreamPlanCursorWalkerContext)); + securec_check(rc, "\0", "\0"); + + context.cursor_expr_level = node_plan->cursor_expr_level; + context.cursor_owner_node_id = node_plan->cursor_owner_node_id; + + (void)set_stream_plan_cursor((Node*)node_plan, &context); +} + /* * finalize_node_id * To finalize node id and parent node id for result plan. The sequence of plan node id doesn't @@ -1606,8 +1645,9 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, /* set the index of bucketmap */ set_bucketmap_index(result_plan, node_group_info_context); } - if (is_under_stream) + if (is_under_stream) { subplan_ids[0] = *plan_node_id; + } *parent_node_id = *plan_node_id; @@ -1688,8 +1728,9 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, total_num_streams, max_push_sql_num, gather_count, subplans, subroots, initplans, subplan_ids, false, is_under_ctescan, is_data_node_exec, is_read_only, node_group_info_context); - } else + } else { break; + } /* * Note, the recursive-cte processing (stream mode), RecursiveUnion * operator is processed in a way like SubPlan initialization, we just @@ -1799,8 +1840,9 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, rq->exec_nodes = get_plan_max_ExecNodes(result_plan->lefttree, subplans); (*num_streams) = 0; } - if (!rq->is_simple) + if (!rq->is_simple) { (*max_push_sql_num)++; + } /* mark num_gather include scan_gather plan_router gather in all plan */ rq->num_gather = *gather_count; @@ -1858,8 +1900,8 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, case T_FunctionScan: { PlannedStmt* cursorPstmt = getCursorStreamFromFuncArg((FuncExpr*)((FunctionScan*)result_plan)->funcexpr); if (cursorPstmt != NULL) { - Stream* stream = (Stream*)cursorPstmt->planTree; - stream->cursor_owner_node_id = result_plan->plan_node_id; + cursorPstmt->planTree->cursor_owner_node_id = result_plan->plan_node_id; + set_stream_plan_cursor_walker(cursorPstmt->planTree); } } break; default: diff --git a/src/gausskernel/optimizer/plan/subselect.cpp b/src/gausskernel/optimizer/plan/subselect.cpp index 8df04c225..6fa6ecc99 100644 --- a/src/gausskernel/optimizer/plan/subselect.cpp +++ b/src/gausskernel/optimizer/plan/subselect.cpp @@ -711,6 +711,8 @@ static Node* make_subplan( /* Reset u_sess->opt_cxt.query_dop. */ u_sess->opt_cxt.query_dop = outerDop; + /* Reset is_stream/is_stream_support because cursorExpr in subquery would change them */ + set_default_stream(); /* Isolate the params needed by this specific subplan */ plan_params = root->plan_params; root->plan_params = NIL; diff --git a/src/gausskernel/process/stream/execStream.cpp b/src/gausskernel/process/stream/execStream.cpp index ec9d7a31e..9416ca135 100755 --- a/src/gausskernel/process/stream/execStream.cpp +++ b/src/gausskernel/process/stream/execStream.cpp @@ -620,8 +620,8 @@ static void InitStream(StreamFlowCtl* ctl, StreamTransType transType) key.queryId = pstmt->queryId; key.planNodeId = plan->plan_node_id; - key.cursorExprLevel = streamNode->cursor_expr_level; - key.cursorParentNodeId = streamNode->cursor_owner_node_id; + key.cursorExprLevel = plan->cursor_expr_level; + key.cursorParentNodeId = plan->cursor_owner_node_id; /* * MPPDB with-recursive support */ @@ -973,11 +973,13 @@ static void InitStreamFlow(StreamFlowCtl* ctl) case T_FunctionScan: { PlannedStmt* cursorPstmt = getCursorStreamFromFuncArg((FuncExpr*)((FunctionScan*)oldPlan)->funcexpr); if (cursorPstmt != NULL) { - Stream* cursorPlan = (Stream*)(cursorPstmt->planTree); - ctl->plan = (Plan*)cursorPlan; + ctl->plan = cursorPstmt->planTree; + + PlannedStmt* oldPlan = ctl->cursorPstmt; ctl->cursorPstmt = cursorPstmt; InitStreamFlow(ctl); + ctl->cursorPstmt = oldPlan; break; } } break; @@ -1191,8 +1193,8 @@ void SetupStreamRuntime(StreamState* node) key.queryId = node->ss.ps.state->es_plannedstmt->queryId; key.planNodeId = streamNode->scan.plan.plan_node_id; - key.cursorExprLevel = streamNode->cursor_expr_level; - key.cursorParentNodeId = streamNode->cursor_owner_node_id; + key.cursorExprLevel = node->ss.ps.plan->cursor_expr_level; + key.cursorParentNodeId = node->ss.ps.plan->cursor_owner_node_id; Assert(u_sess->stream_cxt.global_obj != NULL); pair = u_sess->stream_cxt.global_obj->popStreamPair(key); @@ -1231,8 +1233,8 @@ static void StartupStreamThread(StreamState* node) key.queryId = node->ss.ps.state->es_plannedstmt->queryId; key.planNodeId = node->ss.ps.plan->plan_node_id; - key.cursorExprLevel = ((Stream*)node->ss.ps.plan)->cursor_expr_level; - key.cursorParentNodeId = ((Stream*)node->ss.ps.plan)->cursor_owner_node_id; + key.cursorExprLevel = node->ss.ps.plan->cursor_expr_level; + key.cursorParentNodeId = node->ss.ps.plan->cursor_owner_node_id; Assert(u_sess->stream_cxt.global_obj != NULL); pair = u_sess->stream_cxt.global_obj->popStreamPair(key); Assert(pair->producerList != NULL); diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 42362a938..0a5550a9f 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -231,6 +231,7 @@ static void knl_u_optimizer_init(knl_u_optimizer_context* opt_cxt) opt_cxt->query_dop_store = 1; opt_cxt->query_dop = 1; opt_cxt->smp_enabled = true; + opt_cxt->is_under_cursor = false; opt_cxt->max_query_dop = -1; opt_cxt->parallel_debug_mode = 0; diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index ba85ce547..8e58945b7 100644 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -1667,6 +1667,7 @@ static Portal SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, ParamL #ifndef ENABLE_MULTIPLE_NODES AutoDopControl dopControl; dopControl.CloseSmp(); + dopControl.UnderCursor(); #endif NodeTag old_node_tag = t_thrd.postgres_cxt.cur_command_tag; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 0889f858b..66b307007 100755 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -832,8 +832,10 @@ public: { if (likely(u_sess != NULL)) { m_smpEnabled = u_sess->opt_cxt.smp_enabled; + m_underCursor = u_sess->opt_cxt.is_under_cursor; } else { m_smpEnabled = true; + m_underCursor = false; } } @@ -841,6 +843,7 @@ public: { if (u_sess != NULL) { u_sess->opt_cxt.smp_enabled = m_smpEnabled; + u_sess->opt_cxt.is_under_cursor = m_underCursor; } } @@ -851,15 +854,24 @@ public: } } + void UnderCursor() + { + if (likely(u_sess != NULL)) { + u_sess->opt_cxt.is_under_cursor = true; + } + } + void ResetSmp() { if (u_sess != NULL) { u_sess->opt_cxt.smp_enabled = m_smpEnabled; + u_sess->opt_cxt.is_under_cursor = m_underCursor; } } private: bool m_smpEnabled; + bool m_underCursor; }; #ifdef USE_SPQ diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 058bd54e7..7c5f779dd 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -360,6 +360,7 @@ typedef struct knl_u_optimizer_context { /* Mark smp is enabled in procedure. */ bool smp_enabled; + bool is_under_cursor; double smp_thread_cost; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 707cefe68..86aa0cc06 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -381,6 +381,8 @@ typedef struct Plan { #ifdef USE_SPQ bool spq_scan_partial; #endif + int cursor_expr_level; + int cursor_owner_node_id; } Plan; typedef struct NdpScanCondition { // for each scan node diff --git a/src/include/optimizer/stream_cost.h b/src/include/optimizer/stream_cost.h index cee93c4aa..4316d40fc 100644 --- a/src/include/optimizer/stream_cost.h +++ b/src/include/optimizer/stream_cost.h @@ -89,8 +89,6 @@ typedef struct Stream { #ifdef USE_SPQ int streamID; #endif - int cursor_expr_level; - int cursor_owner_node_id; } Stream; extern void compute_stream_cost(StreamType type, char locator_type, double subrows, double subgblrows, diff --git a/src/include/parser/parse_expr.h b/src/include/parser/parse_expr.h index 763ae3397..1b9c1a929 100644 --- a/src/include/parser/parse_expr.h +++ b/src/include/parser/parse_expr.h @@ -39,6 +39,6 @@ extern bool IsQuerySWCBRewrite(Query *query); extern bool IsSWCBRewriteRTE(RangeTblEntry *rte); extern Datum GetTypeZeroValue(Form_pg_attribute att_tup); typedef Datum (*getTypeZeroValueFunc)(Form_pg_attribute att_tup); -extern PlannedStmt* getCursorStreamFromFuncArg(FuncExpr* funcexpr); +extern PlannedStmt* getCursorStreamFromFuncArg(FuncExpr* funcexpr, CursorExpression** ce = NULL); #endif /* PARSE_EXPR_H */ diff --git a/src/test/regress/expected/cursor_expression.out b/src/test/regress/expected/cursor_expression.out index faa4fa166..a0b5eb681 100644 --- a/src/test/regress/expected/cursor_expression.out +++ b/src/test/regress/expected/cursor_expression.out @@ -915,6 +915,69 @@ select * from test_insert; pro | (3 rows) +reset behavior_compat_options; +create index on employees(employees_id); +explain (costs off) SELECT e1.name FROM employees e1 where employees_id < 10; + QUERY PLAN +------------------------------- + Seq Scan on employees e1 + Filter: (employees_id < 10) +(2 rows) + +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test plan hint in cursor expression +DECLARE CURSOR c1 IS SELECT e.name, CURSOR(SELECT /*+ set(enable_seqscan off) */ e1.name FROM employees e1 where employees_id < 10) abc FROM employees e; + v_name VARCHAR2(10); + type emp_cur_type is ref cursor; + c2 emp_cur_type; + v_name2 VARCHAR2(10); +BEGIN + OPEN c1; + fetch c1 into v_name,c2; + raise notice 'company_name : % %',v_name, c2; + fetch c2 into v_name2; + raise notice 'employee_name : %',v_name2; + close c2; + CLOSE c1; +END; +/ +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: SELECT e.name, CURSOR(SELECT /*+ set(enable_seqscan off) */ e1.name FROM employees e1 where employees_id < 10)abc FROM employees e +Name: datanode1 +--?Seq Scan on cursor_expression.employees e.* + Output: name, CURSOR(SELECT /*+ set(enable_seqscan off) */ e1.name FROM employees e1 where employees_id < 10) + + +CONTEXT: PL/pgSQL function inline_code_block line 8 at FETCH +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +CONTEXT: PL/pgSQL function inline_code_block line 8 at FETCH +NOTICE: company_name : zhangsan +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: SELECT /*+ set(enable_seqscan off) */ e1.name FROM employees e1 where employees_id < 10 +Name: datanode1 +--?Index Scan using employees_employees_id_idx on cursor_expression.employees e1.* + Output: name + Index Cond: (e1.employees_id < 10) + + +CONTEXT: PL/pgSQL function inline_code_block line 10 at FETCH +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +CONTEXT: PL/pgSQL function inline_code_block line 10 at FETCH +NOTICE: employee_name : zhangsan +set enable_auto_explain = off; -- clean drop table test_insert; drop procedure pro_cursor_0011_02; diff --git a/src/test/regress/expected/parallel_enable_function.out b/src/test/regress/expected/parallel_enable_function.out index e192b9c95..563eb3fbf 100644 --- a/src/test/regress/expected/parallel_enable_function.out +++ b/src/test/regress/expected/parallel_enable_function.out @@ -307,6 +307,115 @@ select count(*) from multi_cursor_srf(cursor (select * from employees), cursor ( 100 (1 row) +-- query dop reset after error +explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from multi_cursor_srf(cursor (select * from employees))), cursor (select * from employees)); +ERROR: function multi_cursor_srf(refcursor) does not exist +LINE 1: explain (costs off) select count(*) from multi_cursor_srf(cu... + ^ +HINT: No function matches the given name and argument types. You might need to add explicit type casts. +explain (costs off) select * from employees; + QUERY PLAN +---------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Seq Scan on employees +(2 rows) + +-- test top plan of cursor expr is not stream +explain (costs off) select count(*) from hash_srf(cursor (select * from employees limit 10)), employees; + QUERY PLAN +-------------------------------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Nested Loop + -> Streaming(type: LOCAL ROUNDROBIN dop: 2/1) + -> Function Scan on hash_srf + -> Materialize + -> Streaming(type: BROADCAST dop: 2/2) + -> Seq Scan on employees +(9 rows) + +select count(*) from hash_srf(cursor (select * from employees limit 10)), employees; + count +------- + 1000 +(1 row) + +explain (costs off) select count(*) from hash_srf(cursor (select * from employees a ,employees b)), employees limit 10; + QUERY PLAN +------------------------------------------------------------------------- + Limit + -> Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Nested Loop + -> Function Scan on hash_srf + -> Materialize + -> Streaming(type: BROADCAST dop: 2/2) + -> Seq Scan on employees +(9 rows) + +select count(*) from hash_srf(cursor (select * from employees a ,employees b)), employees limit 10; + count +--------- + 1000000 +(1 row) + +-- test initplan not smp +explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees))) a from employees; + QUERY PLAN +----------------------------------------- + Streaming(type: LOCAL GATHER) + InitPlan 1 (returns $0) + -> Aggregate + -> Function Scan on hash_srf + -> Seq Scan on employees +(5 rows) + +-- test plan hint +set query_dop = 1; +explain (costs off) select count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); -- not smp + QUERY PLAN +--------------------------------- + Aggregate + -> Function Scan on hash_srf +(2 rows) + +select count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); + count +------- + 100 +(1 row) + +explain (costs off) select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select * from employees)); -- not smp + QUERY PLAN +--------------------------------- + Aggregate + -> Function Scan on hash_srf +(2 rows) + +select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select * from employees)); + count +------- + 100 +(1 row) + +explain (costs off) select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); -- smp + QUERY PLAN +---------------------------------------------- + Aggregate + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Aggregate + -> Function Scan on hash_srf +(4 rows) + +select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); + count +------- + 100 +(1 row) + +set query_dop = 1002; -- nested function call explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; QUERY PLAN diff --git a/src/test/regress/expected/smp_cursor.out b/src/test/regress/expected/smp_cursor.out index 451be46f8..9cdb422bf 100644 --- a/src/test/regress/expected/smp_cursor.out +++ b/src/test/regress/expected/smp_cursor.out @@ -545,5 +545,55 @@ select a, cursor(select * from t1) from t1 limit 10; --?.* (10 rows) +-- smp hint in cursor expr among plpgsql does not work +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test plan hint in cursor expression +DECLARE CURSOR c1 IS SELECT a, CURSOR(SELECT /*+ set(query_dop 1002) */ * FROM t1) abc FROM t1; + id int; + type emp_cur_type is ref cursor; + c2 emp_cur_type; + tmp t1%rowtype; +BEGIN + OPEN c1; + fetch c1 into id,c2; + fetch c2 into tmp; + close c2; + CLOSE c1; +END; +/ +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: SELECT a, CURSOR(SELECT /*+ set(query_dop 1002) */ * FROM t1)abc FROM t1 +Name: datanode1 +--?Seq Scan on smp_cursor.t1.* + Output: a, CURSOR(SELECT /*+ set(query_dop 1002) */ * FROM t1) + + +CONTEXT: PL/pgSQL function inline_code_block line 8 at FETCH +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +CONTEXT: PL/pgSQL function inline_code_block line 8 at FETCH +NOTICE: +QueryPlan + +----------------------------NestLevel:0---------------------------- +Query Text: SELECT /*+ set(query_dop 1002) */ * FROM t1 +Name: datanode1 +--?Seq Scan on smp_cursor.t1.* + Output: a, b, c, d + + +CONTEXT: PL/pgSQL function inline_code_block line 9 at FETCH +NOTICE: +----------------------------NestLevel:0---------------------------- +--?duration.* + +CONTEXT: PL/pgSQL function inline_code_block line 9 at FETCH +set enable_auto_explain = off; drop schema smp_cursor cascade; NOTICE: drop cascades to table t1 diff --git a/src/test/regress/sql/cursor_expression.sql b/src/test/regress/sql/cursor_expression.sql index 80d5b6e6d..435c59fb3 100644 --- a/src/test/regress/sql/cursor_expression.sql +++ b/src/test/regress/sql/cursor_expression.sql @@ -527,6 +527,29 @@ create table test_insert(c1 varchar, c2 varchar); insert into test_insert SELECT department_name, CURSOR(SELECT e.name FROM employees e) FROM departments d; select * from test_insert; +reset behavior_compat_options; +create index on employees(employees_id); +explain (costs off) SELECT e1.name FROM employees e1 where employees_id < 10; +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test plan hint in cursor expression +DECLARE CURSOR c1 IS SELECT e.name, CURSOR(SELECT /*+ set(enable_seqscan off) */ e1.name FROM employees e1 where employees_id < 10) abc FROM employees e; + v_name VARCHAR2(10); + type emp_cur_type is ref cursor; + c2 emp_cur_type; + v_name2 VARCHAR2(10); +BEGIN + OPEN c1; + fetch c1 into v_name,c2; + raise notice 'company_name : % %',v_name, c2; + fetch c2 into v_name2; + raise notice 'employee_name : %',v_name2; + close c2; + CLOSE c1; +END; +/ +set enable_auto_explain = off; + -- clean drop table test_insert; drop procedure pro_cursor_0011_02; diff --git a/src/test/regress/sql/parallel_enable_function.sql b/src/test/regress/sql/parallel_enable_function.sql index 64967b624..5d10f154d 100644 --- a/src/test/regress/sql/parallel_enable_function.sql +++ b/src/test/regress/sql/parallel_enable_function.sql @@ -144,6 +144,31 @@ select * from multi_cursor_srf(cursor (select * from employees), cursor (select explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); select count(*) from multi_cursor_srf(cursor (select * from employees), cursor (select * from employees)); +-- query dop reset after error +explain (costs off) select count(*) from multi_cursor_srf(cursor (select * from multi_cursor_srf(cursor (select * from employees))), cursor (select * from employees)); +explain (costs off) select * from employees; + +-- test top plan of cursor expr is not stream +explain (costs off) select count(*) from hash_srf(cursor (select * from employees limit 10)), employees; +select count(*) from hash_srf(cursor (select * from employees limit 10)), employees; + +explain (costs off) select count(*) from hash_srf(cursor (select * from employees a ,employees b)), employees limit 10; +select count(*) from hash_srf(cursor (select * from employees a ,employees b)), employees limit 10; + +-- test initplan not smp +explain (costs off) select 1, (select count(*) from hash_srf(cursor (select * from employees))) a from employees; + +-- test plan hint +set query_dop = 1; +explain (costs off) select count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); -- not smp +select count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); + +explain (costs off) select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select * from employees)); -- not smp +select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select * from employees)); + +explain (costs off) select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); -- smp +select /*+ set(query_dop 1002) */ count(*) from hash_srf(cursor (select /*+ set(query_dop 1002) */ * from employees)); +set query_dop = 1002; -- nested function call explain (costs off) select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; select * from hash_srf(cursor (select * from hash_srf(cursor (select * from employees)))) limit 10; diff --git a/src/test/regress/sql/smp_cursor.sql b/src/test/regress/sql/smp_cursor.sql index f53242f8a..3c8f8dc79 100644 --- a/src/test/regress/sql/smp_cursor.sql +++ b/src/test/regress/sql/smp_cursor.sql @@ -84,4 +84,23 @@ set enable_auto_explain = off; explain (costs off) select a, cursor(select * from t1) from t1 limit 10; select a, cursor(select * from t1) from t1 limit 10; +-- smp hint in cursor expr among plpgsql does not work +set enable_auto_explain = on; +set auto_explain_level = notice; +-- test plan hint in cursor expression +DECLARE CURSOR c1 IS SELECT a, CURSOR(SELECT /*+ set(query_dop 1002) */ * FROM t1) abc FROM t1; + id int; + type emp_cur_type is ref cursor; + c2 emp_cur_type; + tmp t1%rowtype; +BEGIN + OPEN c1; + fetch c1 into id,c2; + fetch c2 into tmp; + close c2; + CLOSE c1; +END; +/ +set enable_auto_explain = off; + drop schema smp_cursor cascade; \ No newline at end of file