[共享存储] forbid mes_task_proc looping infinitely

This commit is contained in:
dongning12
2023-02-02 11:09:46 +08:00
parent acb6439192
commit ed3d27a228
4 changed files with 43 additions and 19 deletions

View File

@ -167,7 +167,7 @@ CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCo
* xid -> clog status
* true if given transaction committed
*/
bool SSTransactionIdDidCommit(TransactionId transactionId)
bool SSTransactionIdDidCommit(TransactionId transactionId, bool* ret_did_commit)
{
bool did_commit = false;
bool remote_get = false;
@ -191,11 +191,10 @@ bool SSTransactionIdDidCommit(TransactionId transactionId)
if (!did_commit) {
dms_context_t dms_ctx;
InitDmsContext(&dms_ctx);
dms_ctx.xid_ctx.xid = *(uint64 *)(&transactionId);
dms_ctx.xid_ctx.inst_id = (unsigned char)SS_MASTER_ID;
do {
dms_ctx.xid_ctx.inst_id = (unsigned char)SS_MASTER_ID;
if (dms_request_opengauss_txn_status(&dms_ctx, (uint8)XID_COMMITTED, (uint8 *)&did_commit)
== DMS_SUCCESS) {
remote_get = true;
@ -206,7 +205,10 @@ bool SSTransactionIdDidCommit(TransactionId transactionId)
} else {
if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) {
ereport(FATAL, (errmsg("SSTransactionIdDidCommit failed during reform, xid=%lu.", transactionId)));
} else if (SS_IN_REFORM) {
return false;
}
pg_usleep(USECS_PER_SEC);
continue;
}
@ -219,35 +221,35 @@ bool SSTransactionIdDidCommit(TransactionId transactionId)
t_thrd.xact_cxt.latestFetchXid = transactionId;
t_thrd.xact_cxt.latestFetchXidStatus = CLOG_XID_STATUS_COMMITTED;
}
return did_commit;
*ret_did_commit = did_commit;
return true;
}
/* xid -> clog status */
/* true if given transaction in progress */
bool SSTransactionIdIsInProgress(TransactionId transactionId)
bool SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress)
{
bool in_progress = true;
dms_context_t dms_ctx;
InitDmsContext(&dms_ctx);
dms_ctx.xid_ctx.xid = *(uint64 *)(&transactionId);
dms_ctx.xid_ctx.inst_id = (unsigned char)SS_MASTER_ID;
dms_ctx.xid_ctx.xid = *(uint64 *)(&transactionId);
do {
if (dms_request_opengauss_txn_status(&dms_ctx, (uint8)XID_INPROGRESS, (uint8 *)&in_progress) == DMS_SUCCESS) {
dms_ctx.xid_ctx.inst_id = (unsigned char)SS_MASTER_ID;
if (dms_request_opengauss_txn_status(&dms_ctx, (uint8)XID_INPROGRESS, (uint8 *)in_progress) == DMS_SUCCESS) {
ereport(DEBUG1, (errmsg("SS get txn in_progress success, xid=%lu, in_progress=%d.",
transactionId, in_progress)));
transactionId, *in_progress)));
break;
} else {
if (SS_IN_REFORM && (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER)) {
ereport(FATAL, (errmsg("SSTransactionIdIsInProgress failed during reform, xid=%lu.", transactionId)));
} else if (SS_IN_REFORM) {
return false;
}
pg_usleep(USECS_PER_SEC);
continue;
}
} while (true);
return in_progress;
return true;
}
TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uint16 t_infomask2)

View File

@ -393,8 +393,19 @@ Datum pgxc_get_csn(PG_FUNCTION_ARGS)
*/
bool TransactionIdDidCommit(TransactionId transactionId) /* true if given transaction committed */
{
if (SS_STANDBY_MODE) {
return SSTransactionIdDidCommit(transactionId);
while (ENABLE_DMS) {
if (SS_IN_REFORM && !SS_PRIMARY_DEMOTING) {
pg_usleep(USECS_PER_SEC);
continue;
} else if (SS_NORMAL_STANDBY) {
bool ret_did_commit = true;
if (SSTransactionIdDidCommit(transactionId, &ret_did_commit)) {
return ret_did_commit;
}
continue;
} else {
break;
}
}
CLogXidStatus xidstatus;

View File

@ -1295,8 +1295,19 @@ bool TransactionIdIsInProgress(TransactionId xid, uint32* needSync, bool shortcu
return false;
}
if (SS_STANDBY_MODE) {
return SSTransactionIdIsInProgress(xid);
while (ENABLE_DMS) {
if (SS_IN_REFORM && !SS_PRIMARY_DEMOTING) {
pg_usleep(USECS_PER_SEC);
continue;
} else if (SS_NORMAL_STANDBY) {
bool in_progress = true;
if (SSTransactionIdIsInProgress(xid, &in_progress)) {
return in_progress;
}
continue;
} else {
break;
}
}
/*

View File

@ -90,8 +90,8 @@ typedef struct SSBroadcasDbBackendsAck {
Snapshot SSGetSnapshotData(Snapshot snapshot);
CommitSeqNo SSTransactionIdGetCommitSeqNo(TransactionId transactionId, bool isCommit, bool isMvcc, bool isNest,
Snapshot snapshot, bool* sync);
bool SSTransactionIdDidCommit(TransactionId transactionId);
bool SSTransactionIdIsInProgress(TransactionId transactionId);
bool SSTransactionIdDidCommit(TransactionId transactionId, bool *ret_did_commit);
bool SSTransactionIdIsInProgress(TransactionId transactionId, bool *in_progress);
TransactionId SSMultiXactIdGetUpdateXid(TransactionId xmax, uint16 t_infomask, uint16 t_infomask2);
bool SSGetOldestXminFromAllStandby();
int SSGetOldestXmin(char *data, uint32 len, char *output_msg, uint32 *output_msg_len);