support smp cursor

This commit is contained in:
chenxiaobin19
2024-06-28 16:10:52 +08:00
parent 3a3e54a07a
commit 65da7e14f4
13 changed files with 106 additions and 33 deletions

View File

@ -105,7 +105,7 @@ typedef struct portalhashent {
inline void ReleaseStreamGroup(Portal portal)
{
#ifndef ENABLE_MULTIPLE_NODES
if (!IS_SPQ_RUNNING && !StreamThreadAmI()) {
if (!IS_SPQ_RUNNING && IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
StreamNodeGroup::ReleaseStreamGroup(true);
portal->streamInfo.Reset();
@ -920,7 +920,7 @@ void AtAbort_Portals(bool STP_rollback)
* estate is under the queryDesc, and stream threads use it.
* we should wait all stream threads exit to cleanup queryDesc.
*/
if (!StreamThreadAmI()) {
if (IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
StreamNodeGroup::ReleaseStreamGroup(true, STREAM_ERROR);
portal->streamInfo.Reset();

View File

@ -142,6 +142,10 @@ void PerformCursorOpen(PlannedStmt* stmt, ParamListInfo params, const char* quer
*/
PortalStart(portal, params, 0, GetActiveSnapshot());
if (u_sess->stream_cxt.global_obj != NULL) {
portal->streamInfo.ResetEnvForCursor();
}
if (u_sess->pgxc_cxt.gc_fdw_snapshot) {
PopActiveSnapshot();
}
@ -286,6 +290,11 @@ void PortalCleanup(Portal portal)
saveResourceOwner = t_thrd.utils_cxt.CurrentResourceOwner;
PG_TRY();
{
#ifndef ENABLE_MULTIPLE_NODES
if (IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
}
#endif
t_thrd.utils_cxt.CurrentResourceOwner = portal->resowner;
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
@ -294,14 +303,12 @@ void PortalCleanup(Portal portal)
* estate is under the queryDesc, and stream threads use it.
* we should wait all stream threads exit to cleanup queryDesc.
*/
if (!StreamThreadAmI()) {
portal->streamInfo.AttachToSession();
if (IS_STREAM_PORTAL) {
StreamNodeGroup::ReleaseStreamGroup(true);
portal->streamInfo.Reset();
}
#else
if (t_thrd.spq_ctx.spq_role == ROLE_UTILITY && !StreamThreadAmI()) {
portal->streamInfo.AttachToSession();
if (t_thrd.spq_ctx.spq_role == ROLE_UTILITY && IS_STREAM_PORTAL) {
StreamNodeGroup::ReleaseStreamGroup(true);
portal->streamInfo.Reset();
}
@ -421,11 +428,21 @@ void PersistHoldablePortal(Portal portal, bool is_rollback)
PushActiveSnapshot(queryDesc->snapshot);
/*
* Rewind the executor: we need to store the entire result set in the
* tuplestore, so that subsequent backward FETCHs can be processed.
*/
ExecutorRewind(queryDesc);
if (IsA(queryDesc->planstate, StreamState)) {
/*
* Record current position when transaction commit for cursor with stream plan,
* so that subsequent absolute FETCHs can be processed properly.
*/
portal->commitPortalPos = portal->portalPos;
portal->portalPos = 0;
} else {
/*
* Rewind the executor: we need to store the entire result set in the
* tuplestore, so that subsequent backward FETCHs can be processed.
*/
portal->commitPortalPos = 0;
ExecutorRewind(queryDesc);
}
/*
* Change the destination to output to the tuplestore. Note we tell
@ -455,6 +472,15 @@ void PersistHoldablePortal(Portal portal, bool is_rollback)
portal->queryDesc = NULL; /* prevent double shutdown */
ExecutorFinish(queryDesc);
ExecutorEnd(queryDesc);
#ifndef ENABLE_MULTIPLE_NODES
if (IS_STREAM_PORTAL) {
portal->streamInfo.AttachToSession();
StreamNodeGroup::ReleaseStreamGroup(true, STREAM_COMPLETE);
portal->streamInfo.Reset();
}
#endif
FreeQueryDesc(queryDesc);
/*

View File

@ -72,7 +72,7 @@
* If Stream is supported, a copy of the 'query' is returned as a backup in case generating a plan
* with Stream fails.
*/
static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context)
static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_context* context, int cursorOptions)
{
if (u_sess->attr.attr_sql.rewrite_rule & PARTIAL_PUSH) {
*stream_unsupport = !context->query_shippable;
@ -86,9 +86,10 @@ static Query* check_shippable(bool *stream_unsupport, Query* query, shipping_con
u_sess->opt_cxt.is_dngather_support = false;
}
/* single node do not support parallel query in cursor */
/* single node support parallel query in cursor only when it is no-scroll cursor */
if (query->utilityStmt && IsA(query->utilityStmt, DeclareCursorStmt)) {
*stream_unsupport = true;
cursorOptions = cursorOptions | ((DeclareCursorStmt*)query->utilityStmt)->options;
*stream_unsupport = !(cursorOptions & CURSOR_OPT_NO_SCROLL) ? true : *stream_unsupport;
}
if (*stream_unsupport || !IS_STREAM) {
@ -128,7 +129,7 @@ PlannedStmt* pgxc_planner(Query* query, int cursorOptions, ParamListInfo boundPa
(void)stream_walker((Node*)query, (void*)(&context));
disable_unshipped_log(query, &context);
re_query = check_shippable(&stream_unsupport, query, &context);
re_query = check_shippable(&stream_unsupport, query, &context, cursorOptions);
} else {
if (unlikely(u_sess->attr.attr_sql.enable_unshipping_log)) {
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,

View File

@ -1069,6 +1069,13 @@ void BuildStreamFlow(PlannedStmt* plan)
u_sess->stream_cxt.global_obj = New(u_sess->stream_cxt.stream_runtime_mem_cxt) StreamNodeGroup();
u_sess->stream_cxt.global_obj->m_streamRuntimeContext = u_sess->stream_cxt.stream_runtime_mem_cxt;
#ifndef ENABLE_MULTIPLE_NODES
if (StreamTopConsumerAmI() && ActivePortal != NULL) {
ActivePortal->streamInfo.RecordSessionInfo();
u_sess->stream_cxt.global_obj->m_portal = ActivePortal;
}
#endif
StreamFlowCtl ctl;
ctl.pstmt = plan;
ctl.plan = plan->planTree;

View File

@ -961,6 +961,8 @@ void StreamNodeGroup::destroy(StreamObjStatus status)
#endif
u_sess->stream_cxt.global_obj->deInit(status);
delete u_sess->stream_cxt.global_obj;
u_sess->stream_cxt.cursorNodeGroupList = list_delete(u_sess->stream_cxt.cursorNodeGroupList,
u_sess->stream_cxt.global_obj);
u_sess->stream_cxt.global_obj = NULL;
}

View File

@ -793,6 +793,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->commitPortalPos = 0;
portal->posOverflow = false;
PopActiveSnapshot();
@ -819,6 +820,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->commitPortalPos = 0;
portal->posOverflow = false;
break;
@ -846,6 +848,7 @@ void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snaps
portal->atStart = true;
portal->atEnd = false; /* allow fetches */
portal->portalPos = 0;
portal->commitPortalPos = 0;
portal->posOverflow = false;
break;
@ -905,15 +908,6 @@ void PortalSetResultFormat(Portal portal, int nFormats, int16* formats)
int natts;
int i;
#ifndef ENABLE_MULTIPLE_NODES
#ifndef USE_SPQ
if (StreamTopConsumerAmI()) {
portal->streamInfo.RecordSessionInfo();
u_sess->stream_cxt.global_obj->m_portal = portal;
}
#endif
#endif
/* Do nothing if portal won't return tuples */
if (portal->tupDesc == NULL)
return;
@ -2191,10 +2185,10 @@ static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long coun
if (portal->atEnd)
pos++; /* need one extra fetch if off end */
if (count <= pos)
(void)PortalRunSelect(portal, false, pos - count + 1, None_Receiver);
else if (count > pos + 1)
(void)PortalRunSelect(portal, true, count - pos - 1, None_Receiver);
if (count - portal->commitPortalPos <= pos)
(void)PortalRunSelect(portal, false, pos - count + 1 + portal->commitPortalPos, None_Receiver);
else if (count - portal->commitPortalPos > pos + 1)
(void)PortalRunSelect(portal, true, count - pos - 1 - portal->commitPortalPos, None_Receiver);
}
return PortalRunSelect(portal, true, 1L, dest);
} else if (count < 0) {

View File

@ -326,6 +326,7 @@ static void knl_u_stream_init(knl_u_stream_context* stream_cxt)
stream_cxt->stop_query_id = 0;
stream_cxt->stream_runtime_mem_cxt = NULL;
stream_cxt->data_exchange_mem_cxt = NULL;
stream_cxt->cursorNodeGroupList = NIL;
}
static void knl_u_sig_init(knl_u_sig_context* sig_cxt)

View File

@ -366,6 +366,13 @@ void ExecReScan(PlanState* node)
ReScanExprContext(node->ps_ExprContext);
}
#ifndef ENABLE_MULTIPLE_NODES
/* if the cursor has executed by stream, it cannot rescan anymore. */
if (IsA(node, StreamState)) {
ereport(ERROR, (errmsg("cursor with stream plan do not support scan backward.")));
}
#endif
/* If need stub execution, stop rescan here */
if (!planstate_need_stub(node)) {
if (IS_PGXC_DATANODE && EXEC_IN_RECURSIVE_MODE(node->plan) && IsA(node, StreamState)) {

View File

@ -591,6 +591,16 @@ bool IsOtherProcRedistribution(PGPROC *otherProc)
*/
inline bool IsInSameTransaction(PGPROC *proc1, PGPROC *proc2)
{
if (has_backend_cursor_stream()) {
ListCell *lc;
foreach(lc, u_sess->stream_cxt.cursorNodeGroupList) {
StreamNodeGroup* streamNodeGroup = (StreamNodeGroup*)lfirst(lc);
Assert(streamNodeGroup != u_sess->stream_cxt.global_obj);
if (streamNodeGroup->inNodeGroup(proc1->pid, proc2->pid)) {
return true;
}
}
}
return u_sess->stream_cxt.global_obj == NULL ? false
: u_sess->stream_cxt.global_obj->inNodeGroup(proc1->pid, proc2->pid);
}
@ -619,8 +629,8 @@ void CancelConflictLockWaiter(PROCLOCK *proclock, LOCK *lock, LockMethod lockMet
bool conflictLocks =
((lockMethodTable->conflictTab[lockmode] & LOCKBIT_ON((unsigned int)proc->waitLockMode)) != 0);
PGPROC *leader2 = (proc->lockGroupLeader == NULL) ? proc : proc->lockGroupLeader;
bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI()) && IsInSameTransaction(proc, t_thrd.proc)) ||
(leader1 == leader2);
bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream()) &&
IsInSameTransaction(proc, t_thrd.proc)) || (leader1 == leader2);
/* send term to waitqueue proc while conflict and not in a stream or lock group */
if (conflictLocks && !isSameTrans && !IsPrepareXact(proc) &&
proc->pid != 0 && gs_signal_send(proc->pid, SIGTERM) < 0) {
@ -646,7 +656,7 @@ void CancelConflictLockHolder(PROCLOCK *proclock, LOCK *lock, LockMethod lockMet
bool conflictLocks = ((lockMethodTable->conflictTab[lockmode] & otherProcLock->holdMask) != 0);
PGPROC *leader2 = (otherProcLock->tag.myProc->lockGroupLeader == NULL) ?
otherProcLock->tag.myProc : otherProcLock->tag.myProc->lockGroupLeader;
bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI()) &&
bool isSameTrans = ((StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream()) &&
IsInSameTransaction(otherProcLock->tag.myProc, t_thrd.proc)) || (leader1 == leader2);
/* send term to holder proc while conflict and not in a stream or lock group */
if (conflictLocks && !isSameTrans && !IsPrepareXact(otherProcLock->tag.myProc) &&
@ -1322,7 +1332,7 @@ int LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock
* thread is in one transaction, but these threads use differnt procs.
* We need treat these procs as one proc
*/
if (StreamTopConsumerAmI() || StreamThreadAmI() || inLockGroup) {
if (StreamTopConsumerAmI() || StreamThreadAmI() || has_backend_cursor_stream() || inLockGroup) {
SHM_QUEUE *otherProcLocks = &(lock->procLocks);
PROCLOCK *otherProcLock = (PROCLOCK *)SHMQueueNext(otherProcLocks, otherProcLocks,
offsetof(PROCLOCK, lockLink));

View File

@ -58,6 +58,8 @@
#define TupleVectorMaxSize 100
#define IS_STREAM_PORTAL (!StreamThreadAmI() && portal->streamInfo.streamGroup != NULL)
struct StreamState;
class StreamObj;
class StreamNodeGroup;

View File

@ -262,6 +262,7 @@ extern bool NeedStubExecution(Plan* plan);
extern TupleTableSlot* FetchPlanSlot(PlanState* subPlanState, ProjectionInfo** projInfos, bool isinherit);
extern long ExecGetPlanMemCost(Plan* node);
extern bool executorEarlyStop();
/* ----------------------------------------------------------------
* ExecProcNode
@ -276,6 +277,11 @@ static inline TupleTableSlot *ExecProcNode(PlanState *node)
{
TupleTableSlot* result;
Assert(node->ExecProcNode);
if (unlikely(executorEarlyStop())) {
return NULL;
}
if (unlikely(node->nodeContext)) {
MemoryContext old_context = MemoryContextSwitchTo(node->nodeContext); /* Switch to Node Level Memory Context */
if (node->chgParam != NULL) /* something changed? */
@ -686,7 +692,6 @@ extern void PthreadRWlockWrlock(ResourceOwner owner, pthread_rwlock_t* rwlock);
extern void PthreadRWlockUnlock(ResourceOwner owner, pthread_rwlock_t* rwlock);
extern void PthreadRwLockInit(pthread_rwlock_t* rwlock, pthread_rwlockattr_t *attr);
extern bool executorEarlyStop();
extern void ExecEarlyFree(PlanState* node);
extern void ExecEarlyFreeBody(PlanState* node);
extern void ExecReSetRecursivePlanTree(PlanState* node);

View File

@ -112,6 +112,9 @@ typedef struct knl_u_stream_context {
class StreamProducer* producer_obj;
/* List of StreamNodeGroup belong to current session that are active in the backend */
List *cursorNodeGroupList;
MemoryContext stream_runtime_mem_cxt;
/* Shared memory context for in-memory data exchange. */
@ -3217,5 +3220,10 @@ inline void stp_reset_xact_state_and_err_msg(bool savedisAllowCommitRollback, bo
}
}
inline bool has_backend_cursor_stream()
{
return list_length(u_sess->stream_cxt.cursorNodeGroupList) > 0;
}
#endif /* SRC_INCLUDE_KNL_KNL_SESSION_H_ */

View File

@ -134,6 +134,15 @@ typedef struct PortalStream {
u_sess->debug_query_id = 0;
}
void ResetEnvForCursor()
{
Assert(u_sess->stream_cxt.global_obj != NULL);
(void)MemoryContextSwitchTo(u_sess->top_portal_cxt);
u_sess->stream_cxt.cursorNodeGroupList = lappend(u_sess->stream_cxt.cursorNodeGroupList,
u_sess->stream_cxt.global_obj);
ResetEnv();
}
void RecordSessionInfo()
{
streamGroup = u_sess->stream_cxt.global_obj;
@ -230,6 +239,7 @@ typedef struct PortalData {
bool atEnd;
bool posOverflow;
long portalPos;
long commitPortalPos;
bool hasStreamForPlpgsql; /* true if plpgsql's portal has stream may cause hang in for-loop */
/* Presentation data, primarily used by the pg_cursors system view */