diff --git a/src/bin/psql/common.cpp b/src/bin/psql/common.cpp index 501c76197..3dd7604c4 100644 --- a/src/bin/psql/common.cpp +++ b/src/bin/psql/common.cpp @@ -2147,7 +2147,8 @@ static bool is_explain_command(const char* query) while (isalpha((unsigned char)query[wordlen])) wordlen += PQmblen(&query[wordlen], pset.encoding); - if (wordlen == 7 && pg_strncasecmp(query, "explain", 7) == 0) + if ((wordlen == 7 && pg_strncasecmp(query, "explain", 7) == 0) || + (wordlen == 4 && pg_strncasecmp(query, "call", 4) == 0)) result = true; return result; diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index 0e9b2d67d..52e96b03c 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -7135,6 +7135,16 @@ static SubPartitionPruningResult *_copySubPartitionPruningResult(const SubPartit COPY_NODE_FIELD(ls_selectedSubPartitionnos); return newnode; } +/*Dolphin call stmt copy*/ + +static DolphinCallStmt *_copyDolphinCallStmt(const DolphinCallStmt *from) +{ + DolphinCallStmt* newnode = makeNode(DolphinCallStmt); + COPY_NODE_FIELD(funccall); + COPY_NODE_FIELD(funcexpr); + COPY_NODE_FIELD(outargs); + return newnode; +} /* ==================partial node copy ================================= */ @@ -8813,6 +8823,9 @@ void* copyObject(const void* from) case T_GetDiagStmt: retval = _copyGetDiagStmt((GetDiagStmt *)from); break; + case T_DolphinCallStmt: + retval = _copyDolphinCallStmt((DolphinCallStmt *)from); + break; default: ereport(ERROR, (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("copyObject: unrecognized node type: %d", (int)nodeTag(from)))); diff --git a/src/common/backend/nodes/nodes.cpp b/src/common/backend/nodes/nodes.cpp index 50ad7fabc..7cc67c77a 100755 --- a/src/common/backend/nodes/nodes.cpp +++ b/src/common/backend/nodes/nodes.cpp @@ -610,7 +610,9 @@ static const TagStr g_tagStrArr[] = {{T_Invalid, "Invalid"}, {T_IndexHintRelationData, "IndexHintRelationData"}, {T_FunctionSources, "FunctionSources"}, {T_CondInfo, "CondInfo"}, - {T_GetDiagStmt, "GetDiagStmt"} + {T_GetDiagStmt, "GetDiagStmt"}, + {T_DolphinCallStmt, "DolphinCallStmt"}, + {T_CallContext, "CallContext"} }; char* nodeTagToString(NodeTag tag) diff --git a/src/common/pl/plpgsql/src/pl_exec.cpp b/src/common/pl/plpgsql/src/pl_exec.cpp index e02279734..82a93bf7c 100644 --- a/src/common/pl/plpgsql/src/pl_exec.cpp +++ b/src/common/pl/plpgsql/src/pl_exec.cpp @@ -74,6 +74,9 @@ extern bool checkRecompileCondition(CachedPlanSource* plansource); static const char* const raise_skip_msg = "RAISE"; +typedef int (*SpiExecuteMultiResHook)(PLpgSQL_execstate* estate, PLpgSQL_expr* expr, + PLpgSQL_stmt_execsql* plstmt, ParamListInfo paramLI, long tcount, bool* multi_res); +typedef void (*SpiMultiResExceptionHook)(); typedef struct { int nargs; /* number of arguments */ Oid* types; /* types of arguments */ @@ -2802,6 +2805,8 @@ static void exec_exception_cleanup(PLpgSQL_execstate* estate, ExceptionContext * } context->cur_edata = edata; + if (u_sess->hook_cxt.pluginMultiResExceptionHook) + ((SpiMultiResExceptionHook)u_sess->hook_cxt.pluginMultiResExceptionHook)(); if (context->hasReleased) { ereport(FATAL, (errmsg("exception happens after current savepoint released error message is: %s", @@ -6776,6 +6781,7 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st PLpgSQL_expr* expr = stmt->sqlstmt; Cursor_Data* saved_cursor_data = NULL; bool has_alloc = false; + bool multi_res_return = false; TransactionId oldTransactionId = SPI_get_top_transaction_id(); @@ -6876,7 +6882,11 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st /* * Execute the plan */ - rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI, estate->readonly_func, tcount); + if (u_sess->hook_cxt.pluginSpiExecuteMultiResHook) + rc = ((SpiExecuteMultiResHook)(u_sess->hook_cxt.pluginSpiExecuteMultiResHook)) + (estate, expr, stmt, paramLI, tcount, &multi_res_return); + else + rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI, estate->readonly_func, tcount); #ifdef ENABLE_MULTIPLE_NODES if (checkAdivsorState() && checkSPIPlan(expr->plan)) { collectDynWithArgs(expr->query, paramLI, expr->plan->cursor_options); @@ -6977,7 +6987,7 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st estate->eval_lastoid = u_sess->SPI_cxt.lastoid; /* Process INTO if present */ - if (stmt->into) { + if (stmt->into && !multi_res_return) { SPITupleTable* tuptab = SPI_tuptable; uint32 n = SPI_processed; PLpgSQL_rec* rec = NULL; diff --git a/src/gausskernel/process/tcop/dest.cpp b/src/gausskernel/process/tcop/dest.cpp index da5524ee7..977871fe2 100644 --- a/src/gausskernel/process/tcop/dest.cpp +++ b/src/gausskernel/process/tcop/dest.cpp @@ -44,6 +44,8 @@ #include "tcop/stmt_retry.h" +typedef DestReceiver* (*ProcDestReciverHook)(CommandDest dest); + /* ---------------- * dummy DestReceiver functions * ---------------- @@ -162,6 +164,8 @@ DestReceiver* CreateDestReceiver(CommandDest dest) case DestTrainModel: return CreateTrainModelDestReceiver(); + case DestSqlProcSPI: + return ((ProcDestReciverHook)u_sess->hook_cxt.pluginProcDestReciverHook)(dest); default: break; } diff --git a/src/gausskernel/process/tcop/utility.cpp b/src/gausskernel/process/tcop/utility.cpp index 3bccda724..2cab40679 100755 --- a/src/gausskernel/process/tcop/utility.cpp +++ b/src/gausskernel/process/tcop/utility.cpp @@ -9550,6 +9550,9 @@ const char* CreateCommandTag(Node* parse_tree) case T_GetDiagStmt: tag = "GET DIAGNOSTICS"; break; + case T_DolphinCallStmt: + tag = "CALL"; + break; default: elog(WARNING, "unrecognized node type: %d", (int)nodeTag(parse_tree)); tag = "?\?\?"; diff --git a/src/gausskernel/runtime/executor/spi.cpp b/src/gausskernel/runtime/executor/spi.cpp index df602a3da..cb5af345e 100644 --- a/src/gausskernel/runtime/executor/spi.cpp +++ b/src/gausskernel/runtime/executor/spi.cpp @@ -2770,6 +2770,8 @@ static int _SPI_execute_plan0(SPIPlanPtr plan, ParamListInfo paramLI, Snapshot s } DestReceiver *dest = CreateDestReceiver(canSetTag ? u_sess->SPI_cxt._current->dest : DestNone); + if (u_sess->SPI_cxt._current->dest == DestSqlProcSPI && u_sess->hook_cxt.pluginSpiReciverParamHook) + ((SpiReciverParamHook)u_sess->hook_cxt.pluginSpiReciverParamHook)(dest,plan); if (IsA(stmt, PlannedStmt) && ((PlannedStmt *)stmt)->utilityStmt == NULL) { QueryDesc *qdesc = NULL; diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index 1fe2e0ae1..e37bc65b6 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -66,6 +66,7 @@ typedef struct SPICachedPlanStack { #define SPI_OPT_NONATOMIC (1 << 0) typedef List* (*parse_query_func)(const char *query_string, List **query_string_locationlist); +typedef void (*SpiReciverParamHook)(DestReceiver *self, SPIPlanPtr plan); /* in postgres.cpp, avoid include tcopprot.h */ extern List* raw_parser(const char* query_string, List** query_string_locationlist); diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index f195cb1cb..0f421d45e 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2746,6 +2746,10 @@ typedef struct knl_u_hook_context { void *pluginCCHashEqFuncs; void *plpgsqlParserSetHook; void *coreYYlexHook; + void *pluginProcDestReciverHook; + void *pluginSpiReciverParamHook; + void *pluginSpiExecuteMultiResHook; + void *pluginMultiResExceptionHook; } knl_u_hook_context; typedef struct knl_u_libsw_context { diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 7ba54e626..d84777b03 100755 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -835,7 +835,9 @@ typedef enum NodeTag { /* ndpplugin tag */ T_NdpScanCondition, T_CondInfo, - T_GetDiagStmt + T_GetDiagStmt, + T_DolphinCallStmt, + T_CallContext } NodeTag; /* if you add to NodeTag also need to add nodeTagToString */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index fbd161858..f7180f8ad 100755 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2329,6 +2329,18 @@ typedef struct DropDirectoryStmt { } DropDirectoryStmt; +/* ---------------------- + * DolphinCall Type Statement, call procedure + * ---------------------- + */ +typedef struct DolphinCallStmt +{ + NodeTag type; + FuncCall *funccall; /* procedure */ + FuncExpr *funcexpr; /* transformCallstmt deal, only input args */ + List *outargs; /* output args only be UserVar */ +} DolphinCallStmt; + /* ---------------------- * Create Type Statement, set types * ---------------------- diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 72e8c3c31..f389d001a 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -111,8 +111,9 @@ typedef enum { DestTrainModel, /* results send to DB4AI model warehouse */ DestBatchHybrid, - DestTransientRel /* results sent to transient relation */ + DestTransientRel, /* results sent to transient relation */ + DestSqlProcSPI /* results sent result to libpq with spi executor */ } CommandDest; class VectorBatch; @@ -169,7 +170,7 @@ extern void NullCommand(CommandDest dest); extern void ReadyForQuery(CommandDest dest); extern void ReadyForQuery_noblock(CommandDest dest, int timeout); -extern void InitSpiPrinttupDR(DestReceiver* dr); - +extern void InitSpiPrinttupDR(DestReceiver* dr); + #endif /* !FRONTEND_PARSER */ #endif /* DEST_H */