mirror of
https://git.postgresql.org/git/postgresql.git
synced 2026-02-14 10:27:04 +08:00
Also "make reformat-dat-files". The only change worthy of note is that pgindent messed up the formatting of launcher.c's struct LogicalRepWorkerId, which led me to notice that that struct wasn't used at all anymore, so I just took it out.
155 lines
4.0 KiB
C
155 lines
4.0 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* execAsync.c
|
|
* Support routines for asynchronous execution
|
|
*
|
|
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/executor/execAsync.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "executor/execAsync.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/nodeAppend.h"
|
|
#include "executor/nodeForeignscan.h"
|
|
|
|
/*
|
|
* Asynchronously request a tuple from a designed async-capable node.
|
|
*/
|
|
void
|
|
ExecAsyncRequest(AsyncRequest *areq)
|
|
{
|
|
if (areq->requestee->chgParam != NULL) /* something changed? */
|
|
ExecReScan(areq->requestee); /* let ReScan handle this */
|
|
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStartNode(areq->requestee->instrument);
|
|
|
|
switch (nodeTag(areq->requestee))
|
|
{
|
|
case T_ForeignScanState:
|
|
ExecAsyncForeignScanRequest(areq);
|
|
break;
|
|
default:
|
|
/* If the node doesn't support async, caller messed up. */
|
|
elog(ERROR, "unrecognized node type: %d",
|
|
(int) nodeTag(areq->requestee));
|
|
}
|
|
|
|
ExecAsyncResponse(areq);
|
|
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStopNode(areq->requestee->instrument,
|
|
TupIsNull(areq->result) ? 0.0 : 1.0);
|
|
}
|
|
|
|
/*
|
|
* Give the asynchronous node a chance to configure the file descriptor event
|
|
* for which it wishes to wait. We expect the node-type specific callback to
|
|
* make a single call of the following form:
|
|
*
|
|
* AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
|
|
*/
|
|
void
|
|
ExecAsyncConfigureWait(AsyncRequest *areq)
|
|
{
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStartNode(areq->requestee->instrument);
|
|
|
|
switch (nodeTag(areq->requestee))
|
|
{
|
|
case T_ForeignScanState:
|
|
ExecAsyncForeignScanConfigureWait(areq);
|
|
break;
|
|
default:
|
|
/* If the node doesn't support async, caller messed up. */
|
|
elog(ERROR, "unrecognized node type: %d",
|
|
(int) nodeTag(areq->requestee));
|
|
}
|
|
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStopNode(areq->requestee->instrument, 0.0);
|
|
}
|
|
|
|
/*
|
|
* Call the asynchronous node back when a relevant event has occurred.
|
|
*/
|
|
void
|
|
ExecAsyncNotify(AsyncRequest *areq)
|
|
{
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStartNode(areq->requestee->instrument);
|
|
|
|
switch (nodeTag(areq->requestee))
|
|
{
|
|
case T_ForeignScanState:
|
|
ExecAsyncForeignScanNotify(areq);
|
|
break;
|
|
default:
|
|
/* If the node doesn't support async, caller messed up. */
|
|
elog(ERROR, "unrecognized node type: %d",
|
|
(int) nodeTag(areq->requestee));
|
|
}
|
|
|
|
ExecAsyncResponse(areq);
|
|
|
|
/* must provide our own instrumentation support */
|
|
if (areq->requestee->instrument)
|
|
InstrStopNode(areq->requestee->instrument,
|
|
TupIsNull(areq->result) ? 0.0 : 1.0);
|
|
}
|
|
|
|
/*
|
|
* Call the requestor back when an asynchronous node has produced a result.
|
|
*/
|
|
void
|
|
ExecAsyncResponse(AsyncRequest *areq)
|
|
{
|
|
switch (nodeTag(areq->requestor))
|
|
{
|
|
case T_AppendState:
|
|
ExecAsyncAppendResponse(areq);
|
|
break;
|
|
default:
|
|
/* If the node doesn't support async, caller messed up. */
|
|
elog(ERROR, "unrecognized node type: %d",
|
|
(int) nodeTag(areq->requestor));
|
|
}
|
|
}
|
|
|
|
/*
|
|
* A requestee node should call this function to deliver the tuple to its
|
|
* requestor node. The requestee node can call this from its ExecAsyncRequest
|
|
* or ExecAsyncNotify callback.
|
|
*/
|
|
void
|
|
ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
|
|
{
|
|
areq->request_complete = true;
|
|
areq->result = result;
|
|
}
|
|
|
|
/*
|
|
* A requestee node should call this function to indicate that it is pending
|
|
* for a callback. The requestee node can call this from its ExecAsyncRequest
|
|
* or ExecAsyncNotify callback.
|
|
*/
|
|
void
|
|
ExecAsyncRequestPending(AsyncRequest *areq)
|
|
{
|
|
areq->callback_pending = true;
|
|
areq->request_complete = false;
|
|
areq->result = NULL;
|
|
}
|