diff --git a/src/common/backend/parser/parse_param.cpp b/src/common/backend/parser/parse_param.cpp index 6eeef12a0..0b2162ba9 100755 --- a/src/common/backend/parser/parse_param.cpp +++ b/src/common/backend/parser/parse_param.cpp @@ -88,42 +88,41 @@ void parse_variable_parameters(ParseState* pstate, Oid** paramTypes, int* numPar pstate->p_coerce_param_hook = variable_coerce_param_hook; } -static Node * variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var) +static Node *variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var) { - VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state; + VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state; + /* already resolved */ + if (var != NULL) { + return NULL; + } - /* already resolved */ - if (var != NULL) - return NULL; + /* did not supply parameter names */ + if (!parstate->paramTypeNames) { + return NULL; + } - /* did not supply parameter names */ - if (!parstate->paramTypeNames) - return NULL; + if (list_length(cref->fields) == 1) { + Node *field1 = (Node *) linitial(cref->fields); + char *name1; + int i; + Param *param; - if (list_length(cref->fields) == 1) - { - Node *field1 = (Node *) linitial(cref->fields); - char *name1; - int i; - Param *param; - - Assert(IsA(field1, String)); - name1 = strVal(field1); - for (i = 0; i < *parstate->numParams; i++) - if (strcmp(name1, parstate->paramTypeNames[i]) == 0) - { - param = makeNode(Param); - param->paramkind = PARAM_EXTERN; - param->paramid = i + 1; - param->paramtype = (*parstate->paramTypes)[i]; - param->paramtypmod = -1; - param->paramcollid = InvalidOid; - param->location = -1; - return (Node *) param; - } - } - - return NULL; + Assert(IsA(field1, String)); + name1 = strVal(field1); + for (i = 0; i < *parstate->numParams; i++) { + if (strcmp(name1, parstate->paramTypeNames[i]) == 0) { + param = makeNode(Param); + param->paramkind = PARAM_EXTERN; + param->paramid = i + 1; + param->paramtype = (*parstate->paramTypes)[i]; + param->paramtypmod = -1; + param->paramcollid = InvalidOid; + param->location = -1; + return (Node *) param; + } + } + } + return NULL; } /* diff --git a/src/common/backend/utils/error/elog.cpp b/src/common/backend/utils/error/elog.cpp index fb8b5ccf2..cca730c7b 100644 --- a/src/common/backend/utils/error/elog.cpp +++ b/src/common/backend/utils/error/elog.cpp @@ -94,6 +94,8 @@ #include "tcop/stmt_retry.h" #include "replication/walsender.h" +#include "libpq/pqmq.h" + #undef _ #define _(x) err_gettext(x) @@ -584,6 +586,15 @@ void errfinish(int dummy, ...) u_sess->stream_cxt.producer_obj->reportNotice(); } } else { + if (elevel == FATAL) { + if (t_thrd.msqueue_cxt.is_changed) { + pq_stop_redirect_to_shm_mq(); + } + if (t_thrd.autonomous_cxt.handle) { + StopBackgroundWorker(); + } + } + /* Emit the message to the right places */ EmitErrorReport(); } @@ -1648,34 +1659,34 @@ void FlushErrorStateWithoutDeleteChildrenContext(void) * background worker processes and then propagated (with or without * modification) to the backend responsible for them. */ -void -ThrowErrorData(ErrorData *edata) +void ThrowErrorData(ErrorData *edata) { - ErrorData *newedata; - MemoryContext oldcontext; + ErrorData *newedata; + MemoryContext oldcontext; - if (!errstart(edata->elevel, edata->filename, edata->lineno, - edata->funcname, NULL)) - return; /* error is not to be reported at all */ + if (!errstart(edata->elevel, edata->filename, edata->lineno, edata->funcname, NULL)) { + /* error is not to be reported at all */ + return; + } - newedata = &t_thrd.log_cxt.errordata[t_thrd.log_cxt.errordata_stack_depth]; - t_thrd.log_cxt.recursion_depth++; - oldcontext = MemoryContextSwitchTo(ErrorContext); + newedata = &t_thrd.log_cxt.errordata[t_thrd.log_cxt.errordata_stack_depth]; + t_thrd.log_cxt.recursion_depth++; + oldcontext = MemoryContextSwitchTo(ErrorContext); - /* Copy the supplied fields to the error stack entry. */ - if (edata->sqlerrcode != 0) - newedata->sqlerrcode = edata->sqlerrcode; - if (edata->message) - newedata->message = pstrdup(edata->message); - if (edata->detail) - newedata->detail = pstrdup(edata->detail); - if (edata->detail_log) - newedata->detail_log = pstrdup(edata->detail_log); - if (edata->hint) - newedata->hint = pstrdup(edata->hint); - if (edata->context) - newedata->context = pstrdup(edata->context); - /* assume message_id is not available */ + /* Copy the supplied fields to the error stack entry. */ + if (edata->sqlerrcode != 0) + newedata->sqlerrcode = edata->sqlerrcode; + if (edata->message) + newedata->message = pstrdup(edata->message); + if (edata->detail) + newedata->detail = pstrdup(edata->detail); + if (edata->detail_log) + newedata->detail_log = pstrdup(edata->detail_log); + if (edata->hint) + newedata->hint = pstrdup(edata->hint); + if (edata->context) + newedata->context = pstrdup(edata->context); + /* assume message_id is not available */ if (newedata->filename) newedata->filename = pstrdup(edata->filename); if (newedata->funcname) @@ -1683,16 +1694,16 @@ ThrowErrorData(ErrorData *edata) if (newedata->backtrace_log) newedata->backtrace_log = pstrdup(edata->backtrace_log); - newedata->cursorpos = edata->cursorpos; - newedata->internalpos = edata->internalpos; - if (edata->internalquery) - newedata->internalquery = pstrdup(edata->internalquery); + newedata->cursorpos = edata->cursorpos; + newedata->internalpos = edata->internalpos; + if (edata->internalquery) + newedata->internalquery = pstrdup(edata->internalquery); - MemoryContextSwitchTo(oldcontext); - t_thrd.log_cxt.recursion_depth--; + MemoryContextSwitchTo(oldcontext); + t_thrd.log_cxt.recursion_depth--; - /* Process the error. */ - errfinish(0); + /* Process the error. */ + errfinish(0); } /* diff --git a/src/common/pl/plpgsql/src/pl_exec.cpp b/src/common/pl/plpgsql/src/pl_exec.cpp index ac073f06d..ae65c3242 100755 --- a/src/common/pl/plpgsql/src/pl_exec.cpp +++ b/src/common/pl/plpgsql/src/pl_exec.cpp @@ -198,6 +198,8 @@ static int check_line_validity_in_for_query(PLpgSQL_stmt_forq* stmt, int, int); static void bind_cursor_with_portal(Portal portal, PLpgSQL_execstate *estate, int varno); static char* transform_anonymous_block(char* query); static bool need_recompile_plan(SPIPlanPtr plan); +static void build_symbol_table(PLpgSQL_execstate *estate, PLpgSQL_nsitem *ns_start, + int *ret_nitems, const char ***ret_names, Oid **ret_types); /* ---------- * plpgsql_check_line_validity Called by the debugger plugin for @@ -1413,7 +1415,12 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) bool savedIsStp = u_sess->SPI_cxt.is_stp; TransactionId oldTransactionId = InvalidTransactionId; + /* autonomous transaction */ if (block->autonomous) { + if (block->exceptions != NULL) { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Un-support feature : Autonomous transaction doesnot support exception"))); + } if (estate->func->fn_is_trigger) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1747,9 +1754,10 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block) } estate->err_text = NULL; - if (block->autonomous) - AutonomousSessionEnd(estate->autonomous_session); + if (block->autonomous) { + AutonomousSessionEnd(estate->autonomous_session); + } /* * Handle the return code. */ @@ -2123,12 +2131,38 @@ static int exec_stmt_assign(PLpgSQL_execstate* estate, PLpgSQL_stmt_assign* stmt static int exec_stmt_perform(PLpgSQL_execstate* estate, PLpgSQL_stmt_perform* stmt) { PLpgSQL_expr* expr = stmt->expr; - TransactionId oldTransactionId = InvalidTransactionId; int rc; + TransactionId oldTransactionId = InvalidTransactionId; + /* autonomous transaction */ + if (estate->autonomous_session) { + if (expr) { + int nparams = 0; + int i; + const char **param_names = NULL; + Oid *param_types = NULL; + AutonomousPreparedStatement *astmt = NULL; + Datum *values = NULL; + bool *nulls = NULL; + AutonomousResult *aresult = NULL; + ereport(LOG, (errmsg("query %s", expr->query))); + build_symbol_table(estate, expr->ns, &nparams, ¶m_names, ¶m_types); + astmt = AutonomousSessionPrepare(estate->autonomous_session, expr->query, (int16)nparams, param_types, param_names); - if (!RecoveryInProgress()) { + values = (Datum *)palloc(nparams * sizeof(*values)); + nulls = (bool *)palloc(nparams * sizeof(*nulls)); + for (i = 0; i < nparams; i++) { + nulls[i] = true; + } + aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls); + exec_set_found(estate, (list_length(aresult->tuples) != 0)); + return PLPGSQL_RC_OK; + } else { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Syntax error: perform error"))); + } + } + if (!RecoveryInProgress()) oldTransactionId = GetTopTransactionId(); - } rc = exec_run_select(estate, expr, 0, NULL); if (rc != SPI_OK_SELECT) { @@ -3898,29 +3932,6 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st oldTransactionId = GetTopTransactionId(); } - if (estate->autonomous_session) { - int nparams = 0; - int i; - const char **param_names = NULL; - Oid *param_types = NULL; - AutonomousPreparedStatement *astmt = NULL; - Datum *values = NULL; - bool *nulls = NULL; - AutonomousResult *aresult = NULL; - t_thrd.autonomous_cxt.sqlstmt = stmt->sqlstmt; - build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, ¶m_names, ¶m_types); - astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, (int16)nparams, param_types, param_names); - - values = (Datum *)palloc(nparams * sizeof(*values)); - nulls = (bool *)palloc(nparams * sizeof(*nulls)); - for (i = 0; i < nparams; i++) { - nulls[i] = true; - } - aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls); - exec_set_found(estate, (list_length(aresult->tuples) != 0)); - return PLPGSQL_RC_OK; - } - /* * On the first call for this statement generate the plan, and detect * whether the statement is INSERT/UPDATE/DELETE/MERGE @@ -3955,6 +3966,41 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st * Set up ParamListInfo (hook function and possibly data values) */ paramLI = setup_param_list(estate, expr); + + /* autonomous transaction */ + if (estate->autonomous_session) { + int nparams = 0; + int i; + const char **param_names = NULL; + Oid *param_types = NULL; + AutonomousPreparedStatement *astmt = NULL; + Datum *values = NULL; + bool *nulls = NULL; + AutonomousResult *aresult = NULL; + t_thrd.autonomous_cxt.sqlstmt = stmt->sqlstmt; + build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, ¶m_names, ¶m_types); + + if (paramLI) { + for (i = 0; i < paramLI->numParams; i++) { + ParamExternData* param = ¶mLI->params[i]; + if (!param->isnull) { + pfree_ext(paramLI); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Autonomous transaction doesnot suport variable transmition"))); + } + } + } + astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, (int16)nparams, param_types, param_names); + + values = (Datum *)palloc(nparams * sizeof(*values)); + nulls = (bool *)palloc(nparams * sizeof(*nulls)); + for (i = 0; i < nparams; i++) { + nulls[i] = true; + } + aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls); + exec_set_found(estate, (list_length(aresult->tuples) != 0)); + return PLPGSQL_RC_OK; + } /* * If we have INTO, then we only need one row back ... but if we have INTO @@ -3979,7 +4025,7 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st } } else { tcount = 0; - } + } saved_cursor_data = estate->cursor_return_data; if (stmt->row != NULL && stmt->row->nfields > 0) { @@ -4334,11 +4380,10 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate* estate, PLpgSQL_stmt_dynexecu exec_eval_cleanup(estate); - if (estate->autonomous_session) - { - (void *)AutonomousSessionExecute(estate->autonomous_session, querystr); - return PLPGSQL_RC_OK; - } + if (estate->autonomous_session) { + (void *)AutonomousSessionExecute(estate->autonomous_session, querystr); + return PLPGSQL_RC_OK; + } if (stmt->params != NULL) { stmt->ppd = (void*)exec_eval_using_params(estate, stmt->params); @@ -5084,6 +5129,7 @@ static int exec_stmt_null(PLpgSQL_execstate* estate, PLpgSQL_stmt* stmt) */ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt) { + /* autonomous transaction */ if (estate->autonomous_session) { if (t_thrd.autonomous_cxt.sqlstmt) { int nparams = 0; @@ -5100,8 +5146,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt values = (Datum *)palloc(nparams * sizeof(*values)); nulls = (bool *)palloc(nparams * sizeof(*nulls)); - for (i = 0; i < nparams; i++) - { + for (i = 0; i < nparams; i++) { nulls[i] = true; } aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls); @@ -5113,7 +5158,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt errmsg("Syntax error: In antonomous transaction, commit/rollback must match start transaction"))); } } - + const char* PORTAL = "Portal"; int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter; @@ -5176,6 +5221,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt */ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* stmt) { + /* autonomous transaction */ if (estate->autonomous_session) { if (t_thrd.autonomous_cxt.sqlstmt) { int nparams = 0; @@ -5192,8 +5238,7 @@ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* values = (Datum *)palloc(nparams * sizeof(*values)); nulls = (bool *)palloc(nparams * sizeof(*nulls)); - for (i = 0; i < nparams; i++) - { + for (i = 0; i < nparams; i++) { nulls[i] = true; } aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls); @@ -5203,9 +5248,8 @@ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* } else { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Syntax error: In antonomous transaction, commit/rollback must match start transaction"))); - } + } } - const char* PORTAL = "Portal"; int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter; diff --git a/src/gausskernel/process/postmaster/bgworker.cpp b/src/gausskernel/process/postmaster/bgworker.cpp index 3951f5726..02eeaefa4 100644 --- a/src/gausskernel/process/postmaster/bgworker.cpp +++ b/src/gausskernel/process/postmaster/bgworker.cpp @@ -105,9 +105,7 @@ struct BackgroundWorkerHandle { static const struct { const char *fn_name; bgworker_main_type fn_addr; -} InternalBGWorkers[] = - -{ +} InternalBGWorkers[] = { { "autonomous_worker_main", autonomous_worker_main @@ -140,8 +138,8 @@ void BackgroundWorkerShmemInit(void) bool found; t_thrd.bgworker_cxt.background_worker_data = (BackgroundWorkerArray*)ShmemInitStruct("Background Worker Data", - BackgroundWorkerShmemSize(), - &found); + BackgroundWorkerShmemSize(), + &found); if (!IsUnderPostmaster) { slist_iter siter; int slotno = 0; @@ -191,7 +189,7 @@ void BackgroundWorkerShmemInit(void) * Search the postmaster's backend-private list of RegisteredBgWorker objects * for the one that maps to the given slot number. */ -static RegisteredBgWorker * FindRegisteredWorkerBySlotNumber(int slotno) +static RegisteredBgWorker* FindRegisteredWorkerBySlotNumber(int slotno) { slist_iter siter; @@ -222,7 +220,8 @@ void BackgroundWorkerStateChange(void) * max_background_workers, in case shared memory gets corrupted while we're * looping. */ - if (g_instance.attr.attr_storage.max_background_workers != t_thrd.bgworker_cxt.background_worker_data->total_slots) { + if (g_instance.attr.attr_storage.max_background_workers != + t_thrd.bgworker_cxt.background_worker_data->total_slots) { elog(LOG, "inconsistent background worker state (max_background_workers=%d, total_slots=%d", g_instance.attr.attr_storage.max_background_workers, @@ -582,7 +581,7 @@ static bool SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel) } if ((worker->bgw_restart_time < 0 && - worker->bgw_restart_time != BGW_NEVER_RESTART) || + worker->bgw_restart_time != BGW_NEVER_RESTART) || (worker->bgw_restart_time > USECS_PER_DAY / 1000)) { ereport(elevel, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -917,7 +916,7 @@ void RegisterBackgroundWorker(BackgroundWorker *worker) * free this pointer using pfree(), if desired. */ bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, - BackgroundWorkerHandle **handle) + BackgroundWorkerHandle **handle) { int slotno; bool success = false; @@ -954,11 +953,11 @@ bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker, * anything useful. */ if (parallel && (int)(t_thrd.bgworker_cxt.background_worker_data->parallel_register_count - - t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count) >= + t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count) >= g_instance.shmem_cxt.max_parallel_workers) { Assert(t_thrd.bgworker_cxt.background_worker_data->parallel_register_count - - t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count <= - MAX_PARALLEL_WORKER_LIMIT); + t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count <= + MAX_PARALLEL_WORKER_LIMIT); LWLockRelease(BackgroundWorkerLock); return false; } @@ -1108,7 +1107,6 @@ BgwHandleStatus WaitForBackgroundWorkerStartup(const BackgroundWorkerHandle *han rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); - if (rc & WL_POSTMASTER_DEATH) { status = BGWH_POSTMASTER_DIED; break; @@ -1145,7 +1143,6 @@ BgwHandleStatus WaitForBackgroundWorkerShutdown(const BackgroundWorkerHandle *ha rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); - if (rc & WL_POSTMASTER_DEATH) { status = BGWH_POSTMASTER_DIED; break; @@ -1185,6 +1182,13 @@ void TerminateBackgroundWorker(const BackgroundWorkerHandle *handle) } } +void StopBackgroundWorker() +{ + TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle); + (void)WaitForBackgroundWorkerShutdown(t_thrd.autonomous_cxt.handle); + t_thrd.autonomous_cxt.handle = NULL; +} + /* * Look up (and possibly load) a bgworker entry point function. * @@ -1234,7 +1238,7 @@ static bgworker_main_type LookupBackgroundWorkerFunction(const char *libraryname * to be used before calling this function again. This is so that the caller * doesn't have to worry about the background worker locking protocol. */ -const char * GetBackgroundWorkerTypeByPid(ThreadId pid) +const char* GetBackgroundWorkerTypeByPid(ThreadId pid) { int slotno; bool found = false; @@ -1325,4 +1329,3 @@ void BackgroundWorkerUnblockSignals(void) (void)gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); } - diff --git a/src/gausskernel/process/tcop/autonomous.cpp b/src/gausskernel/process/tcop/autonomous.cpp index 9e3c40e6b..282e0d3a7 100644 --- a/src/gausskernel/process/tcop/autonomous.cpp +++ b/src/gausskernel/process/tcop/autonomous.cpp @@ -108,7 +108,7 @@ static void forward_NotifyResponse(StringInfo msg); static void rethrow_errornotice(StringInfo msg); static void invalid_protocol_message(char msgtype); -AutonomousSession * AutonomousSessionStart(void) +AutonomousSession *AutonomousSessionStart(void) { BackgroundWorker worker = {0}; ThreadId pid; @@ -562,7 +562,9 @@ void autonomous_worker_main(Datum main_arg) shm_mq_set_sender(response_mq, t_thrd.proc); response_qh = shm_mq_attach(response_mq, seg, NULL); - pq_redirect_to_shm_mq(response_qh); + if (!t_thrd.msqueue_cxt.is_changed) { + pq_redirect_to_shm_mq(response_qh); + } BackgroundWorkerInitializeConnectionByOid(fdata->database_id, fdata->authenticated_user_id); @@ -730,6 +732,7 @@ void autonomous_worker_main(Datum main_arg) break; } case 'X': + case 'N': break; default: ereport(ERROR, @@ -813,15 +816,12 @@ static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg) for (i = 0; i < natts; i++) { int32 len = pq_getmsgint(msg, 4); - if (len < 0) nulls[i] = true; else { Oid recvid; Oid typioparams; - nulls[i] = false; - getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, &recvid, &typioparams); @@ -849,17 +849,14 @@ static void forward_NotifyResponse(StringInfo msg) NotifyMyFrontEnd(channel, payload, pid); } - static void rethrow_errornotice(StringInfo msg) { ErrorData edata; - pq_parse_errornotice(msg, &edata); edata.elevel = Min(edata.elevel, ERROR); ThrowErrorData(&edata); } - static void invalid_protocol_message(char msgtype) { ereport(ERROR, diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 58933bbdc..b544f0459 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -205,7 +205,6 @@ static void get_query_result(TupleTableSlot* slot, DestReceiver* self); * @hdfs * Define different mesage type used for exec_simple_query */ -//typedef enum { QUERY_MESSAGE = 0, HYBRID_MESSAGE } MessageType; /* ---------------------------------------------------------------- * decls for routines only used in this file @@ -237,8 +236,6 @@ extern void CancelAutoAnalyze(); extern List* RevalidateCachedQuery(CachedPlanSource* plansource); static void InitRecursiveCTEGlobalVariables(const PlannedStmt* planstmt); -THR_LOCAL bool needEnd = true; - bool StreamThreadAmI() { return (t_thrd.role == STREAM_WORKER); @@ -7400,12 +7397,12 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam int curTryCounter; int* oldTryCounter = NULL; if (sigsetjmp(local_sigjmp_buf, 1) != 0) { - if (t_thrd.msqueue_cxt.is_changed == true) { + /* error process of autonomous transaction */ + if (t_thrd.msqueue_cxt.is_changed) { pq_stop_redirect_to_shm_mq(); } if (t_thrd.autonomous_cxt.handle) { - TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle); - t_thrd.autonomous_cxt.handle = NULL; + StopBackgroundWorker(); } gstrace_tryblock_exit(true, oldTryCounter); diff --git a/src/gausskernel/storage/ipc/shm_toc.cpp b/src/gausskernel/storage/ipc/shm_toc.cpp index dcf6cbad3..5cddf79b9 100644 --- a/src/gausskernel/storage/ipc/shm_toc.cpp +++ b/src/gausskernel/storage/ipc/shm_toc.cpp @@ -17,14 +17,12 @@ #include "storage/shm_toc.h" #include "storage/spin.h" -struct shm_toc_entry -{ +struct shm_toc_entry { uint64 key; /* Arbitrary identifier */ uint64 offset; /* Bytes offset */ }; -struct shm_toc -{ +struct shm_toc { uint64 toc_magic; /* Magic number for this TOC */ slock_t toc_mutex; /* Spinlock for mutual exclusion */ Size toc_total_bytes; /* Bytes managed by this TOC */ @@ -93,19 +91,17 @@ extern void *shm_toc_allocate(shm_toc *toc, Size nbytes) total_bytes = vtoc->toc_total_bytes; allocated_bytes = vtoc->toc_allocated_bytes; nentry = vtoc->toc_nentry; - toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry) + toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry) + allocated_bytes; /* Check for memory exhaustion and overflow. */ - if (toc_bytes + nbytes > total_bytes || toc_bytes + nbytes < toc_bytes) - { + if (toc_bytes + nbytes > total_bytes || toc_bytes + nbytes < toc_bytes) { SpinLockRelease(&toc->toc_mutex); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"))); } vtoc->toc_allocated_bytes += nbytes; - SpinLockRelease(&toc->toc_mutex); return ((char *) toc) + (total_bytes - allocated_bytes - nbytes); @@ -128,7 +124,7 @@ extern Size shm_toc_freespace(shm_toc *toc) nentry = vtoc->toc_nentry; SpinLockRelease(&toc->toc_mutex); - toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry); + toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry); Assert(allocated_bytes + BUFFERALIGN(toc_bytes) <= total_bytes); return total_bytes - (allocated_bytes + BUFFERALIGN(toc_bytes)); } @@ -153,8 +149,7 @@ extern Size shm_toc_freespace(shm_toc *toc) * pointers that they need to bootstrap. If you're storing a lot of stuff in * here, you're doing it wrong. */ -void -shm_toc_insert(shm_toc *toc, uint64 key, void *address) +void shm_toc_insert(shm_toc *toc, uint64 key, void *address) { volatile shm_toc *vtoc = toc; uint64 total_bytes; @@ -172,13 +167,12 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address) total_bytes = vtoc->toc_total_bytes; allocated_bytes = vtoc->toc_allocated_bytes; nentry = vtoc->toc_nentry; - toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry) + toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry) + allocated_bytes; /* Check for memory exhaustion and overflow. */ if (toc_bytes + sizeof(shm_toc_entry) > total_bytes || - toc_bytes + sizeof(shm_toc_entry) < toc_bytes) - { + toc_bytes + sizeof(shm_toc_entry) < toc_bytes) { SpinLockRelease(&toc->toc_mutex); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -232,8 +226,7 @@ void *shm_toc_lookup(shm_toc *toc, uint64 key) * Estimate how much shared memory will be required to store a TOC and its * dependent data structures. */ -Size -shm_toc_estimate(shm_toc_estimator *e) +Size shm_toc_estimate(shm_toc_estimator *e) { return add_size(offsetof(shm_toc, toc_entry), add_size(mul_size(e->number_of_keys, sizeof(shm_toc_entry)), diff --git a/src/include/postgres.h b/src/include/postgres.h index 9cf80535b..f5778501e 100644 --- a/src/include/postgres.h +++ b/src/include/postgres.h @@ -114,6 +114,9 @@ /* the maximum number of autovacuum launcher thread */ #define AV_LAUNCHER_PROCS 2 +/* the number of Job Schedule Lancher thread */ +#define JOB_SCHEDULE_LAUNCHER_PROCS 1 + #ifdef ENABLE_MULTIPLE_NODES #define STREAM_RESERVE_PROC_TIMES (16) #else diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 2c3d5c6e9..6d529e0ff 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -123,6 +123,7 @@ extern BgwHandleStatus WaitForBackgroundWorkerStartup(const BackgroundWorkerHand extern BgwHandleStatus WaitForBackgroundWorkerShutdown(const BackgroundWorkerHandle *handle); extern const char *GetBackgroundWorkerTypeByPid(ThreadId pid); +extern void StopBackgroundWorker(); /* Terminate a bgworker */ extern void TerminateBackgroundWorker(const BackgroundWorkerHandle *handle); diff --git a/src/include/storage/shm_toc.h b/src/include/storage/shm_toc.h index eb7c4d750..bae9d0a89 100644 --- a/src/include/storage/shm_toc.h +++ b/src/include/storage/shm_toc.h @@ -40,8 +40,7 @@ extern void *shm_toc_lookup(shm_toc *toc, uint64 key); * Tools for estimating how large a chunk of shared memory will be needed * to store a TOC and its dependent objects. */ -typedef struct -{ +typedef struct { Size space_for_chunks; Size number_of_keys; } shm_toc_estimator; diff --git a/src/include/tcop/autonomous.h b/src/include/tcop/autonomous.h index 9a59cf416..237c904f2 100644 --- a/src/include/tcop/autonomous.h +++ b/src/include/tcop/autonomous.h @@ -25,8 +25,7 @@ typedef struct AutonomousPreparedStatement AutonomousPreparedStatement; struct autonomous_session_fixed_data; typedef struct autonomous_session_fixed_data autonomous_session_fixed_data; -typedef struct AutonomousResult -{ +typedef struct AutonomousResult { TupleDesc tupdesc; List *tuples; char *command; @@ -36,7 +35,7 @@ AutonomousSession *AutonomousSessionStart(void); void AutonomousSessionEnd(AutonomousSession *session); AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql); AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs, - Oid argtypes[], const char *argnames[]); + Oid argtypes[], const char *argnames[]); AutonomousResult *AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls); extern void autonomous_worker_main(Datum main_arg); diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index 6532c407b..c43f56d17 100755 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -544,8 +544,7 @@ extern inline int defence_errlevel(void) return ERROR; #endif } - -#endif +#endif /* * Write errors to stderr (or by equal means when stderr is