From 65da7e14f4bd704f73054f9cf8566d431827652d Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Fri, 28 Jun 2024 16:10:52 +0800 Subject: [PATCH] support smp cursor --- src/common/backend/utils/mmgr/portalmem.cpp | 4 +- .../optimizer/commands/portalcmds.cpp | 44 +++++++++++++++---- .../optimizer/plan/pgxcplan_single.cpp | 9 ++-- src/gausskernel/process/stream/execStream.cpp | 7 +++ src/gausskernel/process/stream/streamCore.cpp | 2 + src/gausskernel/process/tcop/pquery.cpp | 20 +++------ .../process/threadpool/knl_session.cpp | 1 + src/gausskernel/runtime/executor/execAmi.cpp | 7 +++ src/gausskernel/storage/lmgr/lock.cpp | 18 ++++++-- src/include/distributelayer/streamCore.h | 2 + src/include/executor/executor.h | 7 ++- src/include/knl/knl_session.h | 8 ++++ src/include/utils/portal.h | 10 +++++ 13 files changed, 106 insertions(+), 33 deletions(-) diff --git a/src/common/backend/utils/mmgr/portalmem.cpp b/src/common/backend/utils/mmgr/portalmem.cpp index f170f8eba..223f6dcfa 100755 --- a/src/common/backend/utils/mmgr/portalmem.cpp +++ b/src/common/backend/utils/mmgr/portalmem.cpp @@ -105,7 +105,7 @@ typedef struct portalhashent { inline void ReleaseStreamGroup(Portal portal) { #ifndef ENABLE_MULTIPLE_NODES - if (!IS_SPQ_RUNNING && !StreamThreadAmI()) { + if (!IS_SPQ_RUNNING && IS_STREAM_PORTAL) { portal->streamInfo.AttachToSession(); StreamNodeGroup::ReleaseStreamGroup(true); portal->streamInfo.Reset(); @@ -920,7 +920,7 @@ void AtAbort_Portals(bool STP_rollback) * estate is under the queryDesc, and stream threads use it. * we should wait all stream threads exit to cleanup queryDesc. */ - if (!StreamThreadAmI()) { + if (IS_STREAM_PORTAL) { portal->streamInfo.AttachToSession(); StreamNodeGroup::ReleaseStreamGroup(true, STREAM_ERROR); portal->streamInfo.Reset(); diff --git a/src/gausskernel/optimizer/commands/portalcmds.cpp b/src/gausskernel/optimizer/commands/portalcmds.cpp index 672a64f35..920460904 100644 --- a/src/gausskernel/optimizer/commands/portalcmds.cpp +++ b/src/gausskernel/optimizer/commands/portalcmds.cpp @@ -142,6 +142,10 @@ void PerformCursorOpen(PlannedStmt* stmt, ParamListInfo params, const char* quer */ PortalStart(portal, params, 0, GetActiveSnapshot()); + if (u_sess->stream_cxt.global_obj != NULL) { + portal->streamInfo.ResetEnvForCursor(); + } + if (u_sess->pgxc_cxt.gc_fdw_snapshot) { PopActiveSnapshot(); } @@ -286,6 +290,11 @@ void PortalCleanup(Portal portal) saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner; PG_TRY(); { +#ifndef ENABLE_MULTIPLE_NODES + if (IS_STREAM_PORTAL) { + portal->streamInfo.AttachToSession(); + } +#endif t_thrd.utils_cxt.CurrentResourceOwner = portal->resowner; ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); @@ -294,14 +303,12 @@ void PortalCleanup(Portal portal) * estate is under the queryDesc, and stream threads use it. * we should wait all stream threads exit to cleanup queryDesc. */ - if (!StreamThreadAmI()) { - portal->streamInfo.AttachToSession(); + if (IS_STREAM_PORTAL) { StreamNodeGroup::ReleaseStreamGroup(true); portal->streamInfo.Reset(); } #else - if (t_thrd.spq_ctx.spq_role == ROLE_UTILITY && !StreamThreadAmI()) { - portal->streamInfo.AttachToSession(); + if (t_thrd.spq_ctx.spq_role == ROLE_UTILITY && IS_STREAM_PORTAL) { StreamNodeGroup::ReleaseStreamGroup(true); portal->streamInfo.Reset(); } @@ -421,11 +428,21 @@ void PersistHoldablePortal(Portal portal, bool is_rollback) PushActiveSnapshot(queryDesc->snapshot); - /* - * Rewind the executor: we need to store the entire result set in the - * tuplestore, so that subsequent backward FETCHs can be processed. - */ - ExecutorRewind(queryDesc); + if (IsA(queryDesc->planstate, StreamState)) { + /* + * Record current position when transaction commit for cursor with stream plan, + * so that subsequent absolute FETCHs can be processed properly. + */ + portal->commitPortalPos = portal->portalPos; + portal->portalPos = 0; + } else { + /* + * Rewind the executor: we need to store the entire result set in the + * tuplestore, so that subsequent backward FETCHs can be processed. + */ + portal->commitPortalPos = 0; + ExecutorRewind(queryDesc); + } /* * Change the destination to output to the tuplestore. Note we tell @@ -455,6 +472,15 @@ void PersistHoldablePortal(Portal portal, bool is_rollback) portal->queryDesc = NULL; /* prevent double shutdown */ ExecutorFinish(queryDesc); ExecutorEnd(queryDesc); + +#ifndef ENABLE_MULTIPLE_NODES + if (IS_STREAM_PORTAL) { + portal->streamInfo.AttachToSession(); + StreamNodeGroup::ReleaseStreamGroup(true, STREAM_COMPLETE); + portal->streamInfo.Reset(); + } +#endif + FreeQueryDesc(queryDesc); /* diff --git a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp index 87c7d85db..fd06bb239 100755 --- a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp +++ b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp @@ -72,7 +72,7 @@ * If Stream is supported, a copy of the 'query' is returned as a backup in case generating a plan * with Stream fails. */ -static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context) +static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context, int cursorOptions) { if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) { *stream_unsupport = !context->query_shippable; @@ -86,9 +86,10 @@ static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_con u_sess->opt_cxt.is_dngather_support = false; } - /* single node do not support parallel query in cursor */ + /* single node support parallel query in cursor only when it is no-scroll cursor */ if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) { - *stream_unsupport = true; + cursorOptions = cursorOptions | ((DeclareCursorStmt*)query->utilityStmt)->options; + *stream_unsupport = !(cursorOptions & CURSOR_OPT_NO_SCROLL) ? true : *stream_unsupport; } if (*stream_unsupport || !IS_STREAM) { @@ -128,7 +129,7 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa (void)stream_walker((Node*)query, (void*)(&context)); disable_unshipped_log(query, &context); - re_query = check_shippable(&stream_unsupport, query, &context); + re_query = check_shippable(&stream_unsupport, query, &context, cursorOptions); } else { if (unlikely(u_sess->attr.attr_sql.enable_unshipping_log)) { errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason, diff --git a/src/gausskernel/process/stream/execStream.cpp b/src/gausskernel/process/stream/execStream.cpp index ae0e0633b..d6654253d 100755 --- a/src/gausskernel/process/stream/execStream.cpp +++ b/src/gausskernel/process/stream/execStream.cpp @@ -1069,6 +1069,13 @@ void BuildStreamFlow(PlannedStmt* plan) u_sess->stream_cxt.global_obj = New(u_sess->stream_cxt.stream_runtime_mem_cxt) StreamNodeGroup(); u_sess->stream_cxt.global_obj->m_streamRuntimeContext = u_sess->stream_cxt.stream_runtime_mem_cxt; +#ifndef ENABLE_MULTIPLE_NODES + if (StreamTopConsumerAmI() && ActivePortal != NULL) { + ActivePortal->streamInfo.RecordSessionInfo(); + u_sess->stream_cxt.global_obj->m_portal = ActivePortal; + } +#endif + StreamFlowCtl ctl; ctl.pstmt = plan; ctl.plan = plan->planTree; diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 4fdbc28c8..c5f33f79f 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -961,6 +961,8 @@ void StreamNodeGroup::destroy(StreamObjStatus status) #endif u_sess->stream_cxt.global_obj->deInit(status); delete u_sess->stream_cxt.global_obj; + u_sess->stream_cxt.cursorNodeGroupList = list_delete(u_sess->stream_cxt.cursorNodeGroupList, + u_sess->stream_cxt.global_obj); u_sess->stream_cxt.global_obj = NULL; } diff --git a/src/gausskernel/process/tcop/pquery.cpp b/src/gausskernel/process/tcop/pquery.cpp index d7252a8d5..15cbfe536 100644 --- a/src/gausskernel/process/tcop/pquery.cpp +++ b/src/gausskernel/process/tcop/pquery.cpp @@ -793,6 +793,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps portal->atStart = true; portal->atEnd = false; /* allow fetches */ portal->portalPos = 0; + portal->commitPortalPos = 0; portal->posOverflow = false; PopActiveSnapshot(); @@ -819,6 +820,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps portal->atStart = true; portal->atEnd = false; /* allow fetches */ portal->portalPos = 0; + portal->commitPortalPos = 0; portal->posOverflow = false; break; @@ -846,6 +848,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps portal->atStart = true; portal->atEnd = false; /* allow fetches */ portal->portalPos = 0; + portal->commitPortalPos = 0; portal->posOverflow = false; break; @@ -905,15 +908,6 @@ void PortalSetResultFormat(Portal portal, int nFormats, int16* formats) int natts; int i; -#ifndef ENABLE_MULTIPLE_NODES -#ifndef USE_SPQ - if (StreamTopConsumerAmI()) { - portal->streamInfo.RecordSessionInfo(); - u_sess->stream_cxt.global_obj->m_portal = portal; - } -#endif -#endif - /* Do nothing if portal won't return tuples */ if (portal->tupDesc == NULL) return; @@ -2191,10 +2185,10 @@ static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long coun if (portal->atEnd) pos++; /* need one extra fetch if off end */ - if (count <= pos) - (void)PortalRunSelect(portal, false, pos - count + 1, None_Receiver); - else if (count > pos + 1) - (void)PortalRunSelect(portal, true, count - pos - 1, None_Receiver); + if (count - portal->commitPortalPos <= pos) + (void)PortalRunSelect(portal, false, pos - count + 1 + portal->commitPortalPos, None_Receiver); + else if (count - portal->commitPortalPos > pos + 1) + (void)PortalRunSelect(portal, true, count - pos - 1 - portal->commitPortalPos, None_Receiver); } return PortalRunSelect(portal, true, 1L, dest); } else if (count < 0) { diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index 58b7568f4..96d8a737e 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -326,6 +326,7 @@ static void knl_u_stream_init(knl_u_stream_context* stream_cxt) stream_cxt->stop_query_id = 0; stream_cxt->stream_runtime_mem_cxt = NULL; stream_cxt->data_exchange_mem_cxt = NULL; + stream_cxt->cursorNodeGroupList = NIL; } static void knl_u_sig_init(knl_u_sig_context* sig_cxt) diff --git a/src/gausskernel/runtime/executor/execAmi.cpp b/src/gausskernel/runtime/executor/execAmi.cpp index dc033b704..e860c8c84 100755 --- a/src/gausskernel/runtime/executor/execAmi.cpp +++ b/src/gausskernel/runtime/executor/execAmi.cpp @@ -366,6 +366,13 @@ void ExecReScan(PlanState* node) ReScanExprContext(node->ps_ExprContext); } +#ifndef ENABLE_MULTIPLE_NODES + /* if the cursor has executed by stream, it cannot rescan anymore. */ + if (IsA(node, StreamState)) { + ereport(ERROR, (errmsg("cursor with stream plan do not support scan backward."))); + } +#endif + /* If need stub execution, stop rescan here */ if (!planstate_need_stub(node)) { if (IS_PGXC_DATANODE && EXEC_IN_RECURSIVE_MODE(node->plan) && IsA(node, StreamState)) { diff --git a/src/gausskernel/storage/lmgr/lock.cpp b/src/gausskernel/storage/lmgr/lock.cpp index 3ef052ed4..88acd0c8c 100644 --- a/src/gausskernel/storage/lmgr/lock.cpp +++ b/src/gausskernel/storage/lmgr/lock.cpp @@ -591,6 +591,16 @@ bool IsOtherProcRedistribution(PGPROC *otherProc) */ inline bool IsInSameTransaction(PGPROC *proc1, PGPROC *proc2) { + if (has_backend_cursor_stream()) { + ListCell *lc; + foreach(lc, u_sess->stream_cxt.cursorNodeGroupList) { + StreamNodeGroup* streamNodeGroup = (StreamNodeGroup*)lfirst(lc); + Assert(streamNodeGroup != u_sess->stream_cxt.global_obj); + if (streamNodeGroup->inNodeGroup(proc1->pid, proc2->pid)) { + return true; + } + } + } return u_sess->stream_cxt.global_obj == NULL ? false : u_sess->stream_cxt.global_obj->inNodeGroup(proc1->pid, proc2->pid); } @@ -619,8 +629,8 @@ void CancelConflictLockWaiter(PROCLOCK *proclock, LOCK *lock, LockMethod lockMet bool conflictLocks = ((lockMethodTable->conflictTab[lockmode] & LOCKBIT_ON((unsigned int)proc->waitLockMode)) != 0); PGPROC *leader2 = (proc->lockGroupLeader == NULL) ? proc : proc->lockGroupLeader; - bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI()) && IsInSameTransaction(proc, t_thrd.proc)) || - (leader1 == leader2); + bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream()) && + IsInSameTransaction(proc, t_thrd.proc)) || (leader1 == leader2); /* send term to waitqueue proc while conflict and not in a stream or lock group */ if (conflictLocks && !isSameTrans && !IsPrepareXact(proc) && proc->pid != 0 && gs_signal_send(proc->pid, SIGTERM) < 0) { @@ -646,7 +656,7 @@ void CancelConflictLockHolder(PROCLOCK *proclock, LOCK *lock, LockMethod lockMet bool conflictLocks = ((lockMethodTable->conflictTab[lockmode] & otherProcLock->holdMask) != 0); PGPROC *leader2 = (otherProcLock->tag.myProc->lockGroupLeader == NULL) ? otherProcLock->tag.myProc : otherProcLock->tag.myProc->lockGroupLeader; - bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI()) && + bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream()) && IsInSameTransaction(otherProcLock->tag.myProc, t_thrd.proc)) || (leader1 == leader2); /* send term to holder proc while conflict and not in a stream or lock group */ if (conflictLocks && !isSameTrans && !IsPrepareXact(otherProcLock->tag.myProc) && @@ -1322,7 +1332,7 @@ int LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock * thread is in one transaction, but these threads use differnt procs. * We need treat these procs as one proc */ - if (StreamTopConsumerAmI() || StreamThreadAmI() || inLockGroup) { + if (StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream() || inLockGroup) { SHM_QUEUE *otherProcLocks = &(lock->procLocks); PROCLOCK *otherProcLock = (PROCLOCK *)SHMQueueNext(otherProcLocks, otherProcLocks, offsetof(PROCLOCK, lockLink)); diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index d899cbf01..76b455c35 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -58,6 +58,8 @@ #define TupleVectorMaxSize 100 +#define IS_STREAM_PORTAL (!StreamThreadAmI() && portal->streamInfo.streamGroup != NULL) + struct StreamState; class StreamObj; class StreamNodeGroup; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 136dd6d93..0889f858b 100755 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -262,6 +262,7 @@ extern bool NeedStubExecution(Plan* plan); extern TupleTableSlot* FetchPlanSlot(PlanState* subPlanState, ProjectionInfo** projInfos, bool isinherit); extern long ExecGetPlanMemCost(Plan* node); +extern bool executorEarlyStop(); /* ---------------------------------------------------------------- * ExecProcNode @@ -276,6 +277,11 @@ static inline TupleTableSlot *ExecProcNode(PlanState *node) { TupleTableSlot* result; Assert(node->ExecProcNode); + + if (unlikely(executorEarlyStop())) { + return NULL; + } + if (unlikely(node->nodeContext)) { MemoryContext old_context = MemoryContextSwitchTo(node->nodeContext); /* Switch to Node Level Memory Context */ if (node->chgParam != NULL) /* something changed? */ @@ -686,7 +692,6 @@ extern void PthreadRWlockWrlock(ResourceOwner owner, pthread_rwlock_t* rwlock); extern void PthreadRWlockUnlock(ResourceOwner owner, pthread_rwlock_t* rwlock); extern void PthreadRwLockInit(pthread_rwlock_t* rwlock, pthread_rwlockattr_t *attr); -extern bool executorEarlyStop(); extern void ExecEarlyFree(PlanState* node); extern void ExecEarlyFreeBody(PlanState* node); extern void ExecReSetRecursivePlanTree(PlanState* node); diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index c45a88cdb..8dae1a566 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -112,6 +112,9 @@ typedef struct knl_u_stream_context { class StreamProducer* producer_obj; + /* List of StreamNodeGroup belong to current session that are active in the backend */ + List *cursorNodeGroupList; + MemoryContext stream_runtime_mem_cxt; /* Shared memory context for in-memory data exchange. */ @@ -3217,5 +3220,10 @@ inline void stp_reset_xact_state_and_err_msg(bool savedisAllowCommitRollback, bo } } +inline bool has_backend_cursor_stream() +{ + return list_length(u_sess->stream_cxt.cursorNodeGroupList) > 0; +} + #endif /* SRC_INCLUDE_KNL_KNL_SESSION_H_ */ diff --git a/src/include/utils/portal.h b/src/include/utils/portal.h index bb8a0dae6..4aa18f6de 100644 --- a/src/include/utils/portal.h +++ b/src/include/utils/portal.h @@ -134,6 +134,15 @@ typedef struct PortalStream { u_sess->debug_query_id = 0; } + void ResetEnvForCursor() + { + Assert(u_sess->stream_cxt.global_obj != NULL); + (void)MemoryContextSwitchTo(u_sess->top_portal_cxt); + u_sess->stream_cxt.cursorNodeGroupList = lappend(u_sess->stream_cxt.cursorNodeGroupList, + u_sess->stream_cxt.global_obj); + ResetEnv(); + } + void RecordSessionInfo() { streamGroup = u_sess->stream_cxt.global_obj; @@ -230,6 +239,7 @@ typedef struct PortalData { bool atEnd; bool posOverflow; long portalPos; + long commitPortalPos; bool hasStreamForPlpgsql; /* true if plpgsql's portal has stream may cause hang in for-loop */ /* Presentation data, primarily used by the pg_cursors system view */