fix bugs of autonomous transaction execute

This commit is contained in:
JianXian
2020-09-09 15:16:09 +08:00
parent eaa65b1403
commit 606cc08ea6
12 changed files with 203 additions and 158 deletions

View File

@ -88,42 +88,41 @@ void parse_variable_parameters(ParseState* pstate, Oid** paramTypes, int* numPar
pstate->p_coerce_param_hook = variable_coerce_param_hook;
}
static Node * variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var)
static Node *variable_post_column_ref_hook(ParseState *pstate, ColumnRef *cref, Node *var)
{
VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state;
VarParamState *parstate = (VarParamState *) pstate->p_ref_hook_state;
/* already resolved */
if (var != NULL) {
return NULL;
}
/* already resolved */
if (var != NULL)
return NULL;
/* did not supply parameter names */
if (!parstate->paramTypeNames) {
return NULL;
}
/* did not supply parameter names */
if (!parstate->paramTypeNames)
return NULL;
if (list_length(cref->fields) == 1) {
Node *field1 = (Node *) linitial(cref->fields);
char *name1;
int i;
Param *param;
if (list_length(cref->fields) == 1)
{
Node *field1 = (Node *) linitial(cref->fields);
char *name1;
int i;
Param *param;
Assert(IsA(field1, String));
name1 = strVal(field1);
for (i = 0; i < *parstate->numParams; i++)
if (strcmp(name1, parstate->paramTypeNames[i]) == 0)
{
param = makeNode(Param);
param->paramkind = PARAM_EXTERN;
param->paramid = i + 1;
param->paramtype = (*parstate->paramTypes)[i];
param->paramtypmod = -1;
param->paramcollid = InvalidOid;
param->location = -1;
return (Node *) param;
}
}
return NULL;
Assert(IsA(field1, String));
name1 = strVal(field1);
for (i = 0; i < *parstate->numParams; i++) {
if (strcmp(name1, parstate->paramTypeNames[i]) == 0) {
param = makeNode(Param);
param->paramkind = PARAM_EXTERN;
param->paramid = i + 1;
param->paramtype = (*parstate->paramTypes)[i];
param->paramtypmod = -1;
param->paramcollid = InvalidOid;
param->location = -1;
return (Node *) param;
}
}
}
return NULL;
}
/*

View File

@ -94,6 +94,8 @@
#include "tcop/stmt_retry.h"
#include "replication/walsender.h"
#include "libpq/pqmq.h"
#undef _
#define _(x) err_gettext(x)
@ -584,6 +586,15 @@ void errfinish(int dummy, ...)
u_sess->stream_cxt.producer_obj->reportNotice();
}
} else {
if (elevel == FATAL) {
if (t_thrd.msqueue_cxt.is_changed) {
pq_stop_redirect_to_shm_mq();
}
if (t_thrd.autonomous_cxt.handle) {
StopBackgroundWorker();
}
}
/* Emit the message to the right places */
EmitErrorReport();
}
@ -1648,34 +1659,34 @@ void FlushErrorStateWithoutDeleteChildrenContext(void)
* background worker processes and then propagated (with or without
* modification) to the backend responsible for them.
*/
void
ThrowErrorData(ErrorData *edata)
void ThrowErrorData(ErrorData *edata)
{
ErrorData *newedata;
MemoryContext oldcontext;
ErrorData *newedata;
MemoryContext oldcontext;
if (!errstart(edata->elevel, edata->filename, edata->lineno,
edata->funcname, NULL))
return; /* error is not to be reported at all */
if (!errstart(edata->elevel, edata->filename, edata->lineno, edata->funcname, NULL)) {
/* error is not to be reported at all */
return;
}
newedata = &t_thrd.log_cxt.errordata[t_thrd.log_cxt.errordata_stack_depth];
t_thrd.log_cxt.recursion_depth++;
oldcontext = MemoryContextSwitchTo(ErrorContext);
newedata = &t_thrd.log_cxt.errordata[t_thrd.log_cxt.errordata_stack_depth];
t_thrd.log_cxt.recursion_depth++;
oldcontext = MemoryContextSwitchTo(ErrorContext);
/* Copy the supplied fields to the error stack entry. */
if (edata->sqlerrcode != 0)
newedata->sqlerrcode = edata->sqlerrcode;
if (edata->message)
newedata->message = pstrdup(edata->message);
if (edata->detail)
newedata->detail = pstrdup(edata->detail);
if (edata->detail_log)
newedata->detail_log = pstrdup(edata->detail_log);
if (edata->hint)
newedata->hint = pstrdup(edata->hint);
if (edata->context)
newedata->context = pstrdup(edata->context);
/* assume message_id is not available */
/* Copy the supplied fields to the error stack entry. */
if (edata->sqlerrcode != 0)
newedata->sqlerrcode = edata->sqlerrcode;
if (edata->message)
newedata->message = pstrdup(edata->message);
if (edata->detail)
newedata->detail = pstrdup(edata->detail);
if (edata->detail_log)
newedata->detail_log = pstrdup(edata->detail_log);
if (edata->hint)
newedata->hint = pstrdup(edata->hint);
if (edata->context)
newedata->context = pstrdup(edata->context);
/* assume message_id is not available */
if (newedata->filename)
newedata->filename = pstrdup(edata->filename);
if (newedata->funcname)
@ -1683,16 +1694,16 @@ ThrowErrorData(ErrorData *edata)
if (newedata->backtrace_log)
newedata->backtrace_log = pstrdup(edata->backtrace_log);
newedata->cursorpos = edata->cursorpos;
newedata->internalpos = edata->internalpos;
if (edata->internalquery)
newedata->internalquery = pstrdup(edata->internalquery);
newedata->cursorpos = edata->cursorpos;
newedata->internalpos = edata->internalpos;
if (edata->internalquery)
newedata->internalquery = pstrdup(edata->internalquery);
MemoryContextSwitchTo(oldcontext);
t_thrd.log_cxt.recursion_depth--;
MemoryContextSwitchTo(oldcontext);
t_thrd.log_cxt.recursion_depth--;
/* Process the error. */
errfinish(0);
/* Process the error. */
errfinish(0);
}
/*

View File

@ -198,6 +198,8 @@ static int check_line_validity_in_for_query(PLpgSQL_stmt_forq* stmt, int, int);
static void bind_cursor_with_portal(Portal portal, PLpgSQL_execstate *estate, int varno);
static char* transform_anonymous_block(char* query);
static bool need_recompile_plan(SPIPlanPtr plan);
static void build_symbol_table(PLpgSQL_execstate *estate, PLpgSQL_nsitem *ns_start,
int *ret_nitems, const char ***ret_names, Oid **ret_types);
/* ----------
* plpgsql_check_line_validity Called by the debugger plugin for
@ -1413,7 +1415,12 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block)
bool savedIsStp = u_sess->SPI_cxt.is_stp;
TransactionId oldTransactionId = InvalidTransactionId;
/* autonomous transaction */
if (block->autonomous) {
if (block->exceptions != NULL) {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Un-support feature : Autonomous transaction doesnot support exception")));
}
if (estate->func->fn_is_trigger) {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@ -1747,9 +1754,10 @@ static int exec_stmt_block(PLpgSQL_execstate* estate, PLpgSQL_stmt_block* block)
}
estate->err_text = NULL;
if (block->autonomous)
AutonomousSessionEnd(estate->autonomous_session);
if (block->autonomous) {
AutonomousSessionEnd(estate->autonomous_session);
}
/*
* Handle the return code.
*/
@ -2123,12 +2131,38 @@ static int exec_stmt_assign(PLpgSQL_execstate* estate, PLpgSQL_stmt_assign* stmt
static int exec_stmt_perform(PLpgSQL_execstate* estate, PLpgSQL_stmt_perform* stmt)
{
PLpgSQL_expr* expr = stmt->expr;
TransactionId oldTransactionId = InvalidTransactionId;
int rc;
TransactionId oldTransactionId = InvalidTransactionId;
/* autonomous transaction */
if (estate->autonomous_session) {
if (expr) {
int nparams = 0;
int i;
const char **param_names = NULL;
Oid *param_types = NULL;
AutonomousPreparedStatement *astmt = NULL;
Datum *values = NULL;
bool *nulls = NULL;
AutonomousResult *aresult = NULL;
ereport(LOG, (errmsg("query %s", expr->query)));
build_symbol_table(estate, expr->ns, &nparams, &param_names, &param_types);
astmt = AutonomousSessionPrepare(estate->autonomous_session, expr->query, (int16)nparams, param_types, param_names);
if (!RecoveryInProgress()) {
values = (Datum *)palloc(nparams * sizeof(*values));
nulls = (bool *)palloc(nparams * sizeof(*nulls));
for (i = 0; i < nparams; i++) {
nulls[i] = true;
}
aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls);
exec_set_found(estate, (list_length(aresult->tuples) != 0));
return PLPGSQL_RC_OK;
} else {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Syntax error: perform error")));
}
}
if (!RecoveryInProgress())
oldTransactionId = GetTopTransactionId();
}
rc = exec_run_select(estate, expr, 0, NULL);
if (rc != SPI_OK_SELECT) {
@ -3898,29 +3932,6 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st
oldTransactionId = GetTopTransactionId();
}
if (estate->autonomous_session) {
int nparams = 0;
int i;
const char **param_names = NULL;
Oid *param_types = NULL;
AutonomousPreparedStatement *astmt = NULL;
Datum *values = NULL;
bool *nulls = NULL;
AutonomousResult *aresult = NULL;
t_thrd.autonomous_cxt.sqlstmt = stmt->sqlstmt;
build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, &param_names, &param_types);
astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, (int16)nparams, param_types, param_names);
values = (Datum *)palloc(nparams * sizeof(*values));
nulls = (bool *)palloc(nparams * sizeof(*nulls));
for (i = 0; i < nparams; i++) {
nulls[i] = true;
}
aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls);
exec_set_found(estate, (list_length(aresult->tuples) != 0));
return PLPGSQL_RC_OK;
}
/*
* On the first call for this statement generate the plan, and detect
* whether the statement is INSERT/UPDATE/DELETE/MERGE
@ -3955,6 +3966,41 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st
* Set up ParamListInfo (hook function and possibly data values)
*/
paramLI = setup_param_list(estate, expr);
/* autonomous transaction */
if (estate->autonomous_session) {
int nparams = 0;
int i;
const char **param_names = NULL;
Oid *param_types = NULL;
AutonomousPreparedStatement *astmt = NULL;
Datum *values = NULL;
bool *nulls = NULL;
AutonomousResult *aresult = NULL;
t_thrd.autonomous_cxt.sqlstmt = stmt->sqlstmt;
build_symbol_table(estate, stmt->sqlstmt->ns, &nparams, &param_names, &param_types);
if (paramLI) {
for (i = 0; i < paramLI->numParams; i++) {
ParamExternData* param = &paramLI->params[i];
if (!param->isnull) {
pfree_ext(paramLI);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Autonomous transaction doesnot suport variable transmition")));
}
}
}
astmt = AutonomousSessionPrepare(estate->autonomous_session, stmt->sqlstmt->query, (int16)nparams, param_types, param_names);
values = (Datum *)palloc(nparams * sizeof(*values));
nulls = (bool *)palloc(nparams * sizeof(*nulls));
for (i = 0; i < nparams; i++) {
nulls[i] = true;
}
aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls);
exec_set_found(estate, (list_length(aresult->tuples) != 0));
return PLPGSQL_RC_OK;
}
/*
* If we have INTO, then we only need one row back ... but if we have INTO
@ -3979,7 +4025,7 @@ static int exec_stmt_execsql(PLpgSQL_execstate* estate, PLpgSQL_stmt_execsql* st
}
} else {
tcount = 0;
}
}
saved_cursor_data = estate->cursor_return_data;
if (stmt->row != NULL && stmt->row->nfields > 0) {
@ -4334,11 +4380,10 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate* estate, PLpgSQL_stmt_dynexecu
exec_eval_cleanup(estate);
if (estate->autonomous_session)
{
(void *)AutonomousSessionExecute(estate->autonomous_session, querystr);
return PLPGSQL_RC_OK;
}
if (estate->autonomous_session) {
(void *)AutonomousSessionExecute(estate->autonomous_session, querystr);
return PLPGSQL_RC_OK;
}
if (stmt->params != NULL) {
stmt->ppd = (void*)exec_eval_using_params(estate, stmt->params);
@ -5084,6 +5129,7 @@ static int exec_stmt_null(PLpgSQL_execstate* estate, PLpgSQL_stmt* stmt)
*/
static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt)
{
/* autonomous transaction */
if (estate->autonomous_session) {
if (t_thrd.autonomous_cxt.sqlstmt) {
int nparams = 0;
@ -5100,8 +5146,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt
values = (Datum *)palloc(nparams * sizeof(*values));
nulls = (bool *)palloc(nparams * sizeof(*nulls));
for (i = 0; i < nparams; i++)
{
for (i = 0; i < nparams; i++) {
nulls[i] = true;
}
aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls);
@ -5113,7 +5158,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt
errmsg("Syntax error: In antonomous transaction, commit/rollback must match start transaction")));
}
}
const char* PORTAL = "Portal";
int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter;
@ -5176,6 +5221,7 @@ static int exec_stmt_commit(PLpgSQL_execstate* estate, PLpgSQL_stmt_commit* stmt
*/
static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback* stmt)
{
/* autonomous transaction */
if (estate->autonomous_session) {
if (t_thrd.autonomous_cxt.sqlstmt) {
int nparams = 0;
@ -5192,8 +5238,7 @@ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback*
values = (Datum *)palloc(nparams * sizeof(*values));
nulls = (bool *)palloc(nparams * sizeof(*nulls));
for (i = 0; i < nparams; i++)
{
for (i = 0; i < nparams; i++) {
nulls[i] = true;
}
aresult = AutonomousSessionExecutePrepared(astmt, (int16)nparams, values, nulls);
@ -5203,9 +5248,8 @@ static int exec_stmt_rollback(PLpgSQL_execstate* estate, PLpgSQL_stmt_rollback*
} else {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Syntax error: In antonomous transaction, commit/rollback must match start transaction")));
}
}
}
const char* PORTAL = "Portal";
int subTransactionCount = u_sess->SPI_cxt.portal_stp_exception_counter;

View File

@ -105,9 +105,7 @@ struct BackgroundWorkerHandle {
static const struct {
const char *fn_name;
bgworker_main_type fn_addr;
} InternalBGWorkers[] =
{
} InternalBGWorkers[] = {
{
"autonomous_worker_main",
autonomous_worker_main
@ -140,8 +138,8 @@ void BackgroundWorkerShmemInit(void)
bool found;
t_thrd.bgworker_cxt.background_worker_data = (BackgroundWorkerArray*)ShmemInitStruct("Background Worker Data",
BackgroundWorkerShmemSize(),
&found);
BackgroundWorkerShmemSize(),
&found);
if (!IsUnderPostmaster) {
slist_iter siter;
int slotno = 0;
@ -191,7 +189,7 @@ void BackgroundWorkerShmemInit(void)
* Search the postmaster's backend-private list of RegisteredBgWorker objects
* for the one that maps to the given slot number.
*/
static RegisteredBgWorker * FindRegisteredWorkerBySlotNumber(int slotno)
static RegisteredBgWorker* FindRegisteredWorkerBySlotNumber(int slotno)
{
slist_iter siter;
@ -222,7 +220,8 @@ void BackgroundWorkerStateChange(void)
* max_background_workers, in case shared memory gets corrupted while we're
* looping.
*/
if (g_instance.attr.attr_storage.max_background_workers != t_thrd.bgworker_cxt.background_worker_data->total_slots) {
if (g_instance.attr.attr_storage.max_background_workers !=
t_thrd.bgworker_cxt.background_worker_data->total_slots) {
elog(LOG,
"inconsistent background worker state (max_background_workers=%d, total_slots=%d",
g_instance.attr.attr_storage.max_background_workers,
@ -582,7 +581,7 @@ static bool SanityCheckBackgroundWorker(BackgroundWorker *worker, int elevel)
}
if ((worker->bgw_restart_time < 0 &&
worker->bgw_restart_time != BGW_NEVER_RESTART) ||
worker->bgw_restart_time != BGW_NEVER_RESTART) ||
(worker->bgw_restart_time > USECS_PER_DAY / 1000)) {
ereport(elevel,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -917,7 +916,7 @@ void RegisterBackgroundWorker(BackgroundWorker *worker)
* free this pointer using pfree(), if desired.
*/
bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
BackgroundWorkerHandle **handle)
BackgroundWorkerHandle **handle)
{
int slotno;
bool success = false;
@ -954,11 +953,11 @@ bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
* anything useful.
*/
if (parallel && (int)(t_thrd.bgworker_cxt.background_worker_data->parallel_register_count -
t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count) >=
t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count) >=
g_instance.shmem_cxt.max_parallel_workers) {
Assert(t_thrd.bgworker_cxt.background_worker_data->parallel_register_count -
t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count <=
MAX_PARALLEL_WORKER_LIMIT);
t_thrd.bgworker_cxt.background_worker_data->parallel_terminate_count <=
MAX_PARALLEL_WORKER_LIMIT);
LWLockRelease(BackgroundWorkerLock);
return false;
}
@ -1108,7 +1107,6 @@ BgwHandleStatus WaitForBackgroundWorkerStartup(const BackgroundWorkerHandle *han
rc = WaitLatch(&t_thrd.proc->procLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
if (rc & WL_POSTMASTER_DEATH) {
status = BGWH_POSTMASTER_DIED;
break;
@ -1145,7 +1143,6 @@ BgwHandleStatus WaitForBackgroundWorkerShutdown(const BackgroundWorkerHandle *ha
rc = WaitLatch(&t_thrd.proc->procLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
if (rc & WL_POSTMASTER_DEATH) {
status = BGWH_POSTMASTER_DIED;
break;
@ -1185,6 +1182,13 @@ void TerminateBackgroundWorker(const BackgroundWorkerHandle *handle)
}
}
void StopBackgroundWorker()
{
TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle);
(void)WaitForBackgroundWorkerShutdown(t_thrd.autonomous_cxt.handle);
t_thrd.autonomous_cxt.handle = NULL;
}
/*
* Look up (and possibly load) a bgworker entry point function.
*
@ -1234,7 +1238,7 @@ static bgworker_main_type LookupBackgroundWorkerFunction(const char *libraryname
* to be used before calling this function again. This is so that the caller
* doesn't have to worry about the background worker locking protocol.
*/
const char * GetBackgroundWorkerTypeByPid(ThreadId pid)
const char* GetBackgroundWorkerTypeByPid(ThreadId pid)
{
int slotno;
bool found = false;
@ -1325,4 +1329,3 @@ void BackgroundWorkerUnblockSignals(void)
(void)gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);
}

View File

@ -108,7 +108,7 @@ static void forward_NotifyResponse(StringInfo msg);
static void rethrow_errornotice(StringInfo msg);
static void invalid_protocol_message(char msgtype);
AutonomousSession * AutonomousSessionStart(void)
AutonomousSession *AutonomousSessionStart(void)
{
BackgroundWorker worker = {0};
ThreadId pid;
@ -562,7 +562,9 @@ void autonomous_worker_main(Datum main_arg)
shm_mq_set_sender(response_mq, t_thrd.proc);
response_qh = shm_mq_attach(response_mq, seg, NULL);
pq_redirect_to_shm_mq(response_qh);
if (!t_thrd.msqueue_cxt.is_changed) {
pq_redirect_to_shm_mq(response_qh);
}
BackgroundWorkerInitializeConnectionByOid(fdata->database_id,
fdata->authenticated_user_id);
@ -730,6 +732,7 @@ void autonomous_worker_main(Datum main_arg)
break;
}
case 'X':
case 'N':
break;
default:
ereport(ERROR,
@ -813,15 +816,12 @@ static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg)
for (i = 0; i < natts; i++) {
int32 len = pq_getmsgint(msg, 4);
if (len < 0)
nulls[i] = true;
else {
Oid recvid;
Oid typioparams;
nulls[i] = false;
getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid,
&recvid,
&typioparams);
@ -849,17 +849,14 @@ static void forward_NotifyResponse(StringInfo msg)
NotifyMyFrontEnd(channel, payload, pid);
}
static void rethrow_errornotice(StringInfo msg)
{
ErrorData edata;
pq_parse_errornotice(msg, &edata);
edata.elevel = Min(edata.elevel, ERROR);
ThrowErrorData(&edata);
}
static void invalid_protocol_message(char msgtype)
{
ereport(ERROR,

View File

@ -205,7 +205,6 @@ static void get_query_result(TupleTableSlot* slot, DestReceiver* self);
* @hdfs
* Define different mesage type used for exec_simple_query
*/
//typedef enum { QUERY_MESSAGE = 0, HYBRID_MESSAGE } MessageType;
/* ----------------------------------------------------------------
* decls for routines only used in this file
@ -237,8 +236,6 @@ extern void CancelAutoAnalyze();
extern List* RevalidateCachedQuery(CachedPlanSource* plansource);
static void InitRecursiveCTEGlobalVariables(const PlannedStmt* planstmt);
THR_LOCAL bool needEnd = true;
bool StreamThreadAmI()
{
return (t_thrd.role == STREAM_WORKER);
@ -7400,12 +7397,12 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam
int curTryCounter;
int* oldTryCounter = NULL;
if (sigsetjmp(local_sigjmp_buf, 1) != 0) {
if (t_thrd.msqueue_cxt.is_changed == true) {
/* error process of autonomous transaction */
if (t_thrd.msqueue_cxt.is_changed) {
pq_stop_redirect_to_shm_mq();
}
if (t_thrd.autonomous_cxt.handle) {
TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle);
t_thrd.autonomous_cxt.handle = NULL;
StopBackgroundWorker();
}
gstrace_tryblock_exit(true, oldTryCounter);

View File

@ -17,14 +17,12 @@
#include "storage/shm_toc.h"
#include "storage/spin.h"
struct shm_toc_entry
{
struct shm_toc_entry {
uint64 key; /* Arbitrary identifier */
uint64 offset; /* Bytes offset */
};
struct shm_toc
{
struct shm_toc {
uint64 toc_magic; /* Magic number for this TOC */
slock_t toc_mutex; /* Spinlock for mutual exclusion */
Size toc_total_bytes; /* Bytes managed by this TOC */
@ -93,19 +91,17 @@ extern void *shm_toc_allocate(shm_toc *toc, Size nbytes)
total_bytes = vtoc->toc_total_bytes;
allocated_bytes = vtoc->toc_allocated_bytes;
nentry = vtoc->toc_nentry;
toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry)
toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry)
+ allocated_bytes;
/* Check for memory exhaustion and overflow. */
if (toc_bytes + nbytes > total_bytes || toc_bytes + nbytes < toc_bytes)
{
if (toc_bytes + nbytes > total_bytes || toc_bytes + nbytes < toc_bytes) {
SpinLockRelease(&toc->toc_mutex);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
}
vtoc->toc_allocated_bytes += nbytes;
SpinLockRelease(&toc->toc_mutex);
return ((char *) toc) + (total_bytes - allocated_bytes - nbytes);
@ -128,7 +124,7 @@ extern Size shm_toc_freespace(shm_toc *toc)
nentry = vtoc->toc_nentry;
SpinLockRelease(&toc->toc_mutex);
toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry);
toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry);
Assert(allocated_bytes + BUFFERALIGN(toc_bytes) <= total_bytes);
return total_bytes - (allocated_bytes + BUFFERALIGN(toc_bytes));
}
@ -153,8 +149,7 @@ extern Size shm_toc_freespace(shm_toc *toc)
* pointers that they need to bootstrap. If you're storing a lot of stuff in
* here, you're doing it wrong.
*/
void
shm_toc_insert(shm_toc *toc, uint64 key, void *address)
void shm_toc_insert(shm_toc *toc, uint64 key, void *address)
{
volatile shm_toc *vtoc = toc;
uint64 total_bytes;
@ -172,13 +167,12 @@ shm_toc_insert(shm_toc *toc, uint64 key, void *address)
total_bytes = vtoc->toc_total_bytes;
allocated_bytes = vtoc->toc_allocated_bytes;
nentry = vtoc->toc_nentry;
toc_bytes = offsetof(shm_toc, toc_entry) +nentry * sizeof(shm_toc_entry)
toc_bytes = offsetof(shm_toc, toc_entry) + nentry * sizeof(shm_toc_entry)
+ allocated_bytes;
/* Check for memory exhaustion and overflow. */
if (toc_bytes + sizeof(shm_toc_entry) > total_bytes ||
toc_bytes + sizeof(shm_toc_entry) < toc_bytes)
{
toc_bytes + sizeof(shm_toc_entry) < toc_bytes) {
SpinLockRelease(&toc->toc_mutex);
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@ -232,8 +226,7 @@ void *shm_toc_lookup(shm_toc *toc, uint64 key)
* Estimate how much shared memory will be required to store a TOC and its
* dependent data structures.
*/
Size
shm_toc_estimate(shm_toc_estimator *e)
Size shm_toc_estimate(shm_toc_estimator *e)
{
return add_size(offsetof(shm_toc, toc_entry),
add_size(mul_size(e->number_of_keys, sizeof(shm_toc_entry)),

View File

@ -114,6 +114,9 @@
/* the maximum number of autovacuum launcher thread */
#define AV_LAUNCHER_PROCS 2
/* the number of Job Schedule Lancher thread */
#define JOB_SCHEDULE_LAUNCHER_PROCS 1
#ifdef ENABLE_MULTIPLE_NODES
#define STREAM_RESERVE_PROC_TIMES (16)
#else

View File

@ -123,6 +123,7 @@ extern BgwHandleStatus WaitForBackgroundWorkerStartup(const BackgroundWorkerHand
extern BgwHandleStatus WaitForBackgroundWorkerShutdown(const BackgroundWorkerHandle *handle);
extern const char *GetBackgroundWorkerTypeByPid(ThreadId pid);
extern void StopBackgroundWorker();
/* Terminate a bgworker */
extern void TerminateBackgroundWorker(const BackgroundWorkerHandle *handle);

View File

@ -40,8 +40,7 @@ extern void *shm_toc_lookup(shm_toc *toc, uint64 key);
* Tools for estimating how large a chunk of shared memory will be needed
* to store a TOC and its dependent objects.
*/
typedef struct
{
typedef struct {
Size space_for_chunks;
Size number_of_keys;
} shm_toc_estimator;

View File

@ -25,8 +25,7 @@ typedef struct AutonomousPreparedStatement AutonomousPreparedStatement;
struct autonomous_session_fixed_data;
typedef struct autonomous_session_fixed_data autonomous_session_fixed_data;
typedef struct AutonomousResult
{
typedef struct AutonomousResult {
TupleDesc tupdesc;
List *tuples;
char *command;
@ -36,7 +35,7 @@ AutonomousSession *AutonomousSessionStart(void);
void AutonomousSessionEnd(AutonomousSession *session);
AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql);
AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session, const char *sql, int16 nargs,
Oid argtypes[], const char *argnames[]);
Oid argtypes[], const char *argnames[]);
AutonomousResult *AutonomousSessionExecutePrepared(AutonomousPreparedStatement *stmt, int16 nargs, Datum *values, bool *nulls);
extern void autonomous_worker_main(Datum main_arg);

View File

@ -544,8 +544,7 @@ extern inline int defence_errlevel(void)
return ERROR;
#endif
}
#endif
#endif
/*
* Write errors to stderr (or by equal means when stderr is