!6619 修复池化下备机写转发在备机自己内部报错后无法转ROLLBACK给主机的问题

Merge pull request !6619 from cchen676/240710master
This commit is contained in:
opengauss_bot
2024-11-06 08:54:12 +00:00
committed by Gitee
2 changed files with 45 additions and 44 deletions

View File

@ -4058,10 +4058,6 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback)
}
#endif
if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE && !libpqsw_is_end()) {
libpqsw_disconnect(true);
}
s->savepointList = NULL;
TwoPhaseCommit = false;

View File

@ -786,62 +786,51 @@ static CachedPlanSource* libpqsw_get_plancache(StringInfo msg, int qtype)
* if B message begin, we need search local plancache to query if
* it is start transaction command.
*/
static void libpqsw_process_bind_message(StringInfo msg)
static bool libpqsw_process_bind_message(StringInfo msg, CachedPlanSource* psrc)
{
if (get_redirect_manager()->messages_manager.message_empty()
&& libpqsw_remote_in_transaction()) {
libpqsw_set_transaction(true);
return;
}
if (libpqsw_get_transaction() || libpqsw_get_set_command()) {
return;
if (libpqsw_get_set_command()) {
return true;
}
CachedPlanSource* psrc = libpqsw_get_plancache(msg, 'B');
if (psrc == NULL) {
return;
}
libpqsw_set_command_tag(psrc->commandTag);
(void)libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string);
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
}
return false;
}
/*
* this message obviously need judge if need transfer.
*/
static void libpqsw_process_transfer_message(int qtype, StringInfo msg)
static bool libpqsw_process_transfer_message(int qtype, StringInfo msg)
{
if (libpqsw_redirect() && (qtype == 'B')) {
if (qtype == 'E') {
if (libpqsw_remote_in_transaction()) {
libpqsw_set_transaction(true);
}
} else if (qtype == 'B' || qtype == 'U') {
CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype);
if (psrc != NULL) {
libpqsw_set_command_tag(psrc->commandTag);
libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string);
if (IsAbortedTransactionBlockState() && !libpqsw_end_command(psrc->commandTag)) {
return false;
}
if (qtype == 'B' && libpqsw_process_bind_message(msg, psrc)) {
return true;
} else if (qtype == 'U') {
libpqsw_set_batch(true);
}
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
}
}
return;
}
if (qtype == 'U') {
libpqsw_set_batch(true);
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype);
if (psrc != NULL) {
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
}
}
} else if (qtype == 'B') {
libpqsw_process_bind_message(msg);
} else if (qtype == 'E') {
if (libpqsw_remote_in_transaction()) {
libpqsw_set_transaction(true);
}
} else {
// nothing to do
}
return true;
}
static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool afterpush, bool remote_execute)
@ -856,7 +845,9 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft
}
/* For B E */
if ((redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) && afterpush &&
if (((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) ||
(redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) &&
afterpush &&
!remote_execute) {
ret = true;
return ret;
@ -864,7 +855,8 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft
/* For S */
if (remote_execute) {
if (redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) {
if ((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) ||
(redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) {
ret = true;
}
@ -1014,7 +1006,7 @@ bool libpqsw_only_localrun()
*/
bool libpqsw_process_message(int qtype, StringInfo msg)
{
if (IsAbortedTransactionBlockState() || u_sess->proc_cxt.clientIsCMAgent) {
if (u_sess->proc_cxt.clientIsCMAgent) {
return false;
}
@ -1054,7 +1046,10 @@ bool libpqsw_process_message(int qtype, StringInfo msg)
return false;
}
// process U B E msg
libpqsw_process_transfer_message(qtype, msg);
if (!libpqsw_process_transfer_message(qtype, msg)) {
return false;
}
bool ready_to_excute = false;
if (libpqsw_get_set_command()) {
ready_to_excute = redirect_manager->push_message(qtype, msg, false, RT_SET);
@ -1102,6 +1097,10 @@ bool libpqsw_process_parse_message(const char* commandTag, List* query_list)
libpqsw_set_command_tag(commandTag);
bool need_redirect = libpqsw_before_redirect(commandTag, query_list, NULL);
if (IsAbortedTransactionBlockState() && !libpqsw_end_command(commandTag)) {
need_redirect = false;
}
if (need_redirect && SS_STANDBY_MODE &&
libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_PARSE)) {
if (get_redirect_manager()->ss_standby_state & SS_STANDBY_REQ_SELECT) {
@ -1124,6 +1123,7 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
return false;
}
bool enableCe = false;
libpqsw_set_command_tag(commandTag);
bool need_redirect = libpqsw_before_redirect(commandTag, query_list, query_string);
if (need_redirect && !libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_QUERY)) {
@ -1147,9 +1147,10 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
}
}
enableCe = get_redirect_manager()->state.client_enable_ce;
// because we are not skip Q message process, so send_ready_for_query will be true after transfer.
// but after transter, master will send Z message for front, so we not need to this flag.
if (get_redirect_manager()->state.client_enable_ce || libpqsw_end_command(commandTag) ||
if (enableCe || libpqsw_end_command(commandTag) ||
libpqsw_begin_command(commandTag)) {
libpqsw_set_end(true);
} else {
@ -1165,6 +1166,9 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
if (get_sw_cxt()->streamConn->xactStatus == PQTRANS_INERROR) {
libpqsw_disconnect(true);
AbortCurrentTransaction();
if (!enableCe) {
libpqsw_set_end(false);
}
}
} else {
// we need send_ready_for_query for init.
@ -1203,6 +1207,7 @@ bool libpqsw_begin_command(const char* commandTag)
bool libpqsw_end_command(const char* commandTag)
{
return commandTag != NULL && (strcmp(commandTag, "COMMIT") == 0 ||
strcmp(commandTag, "PREPARE TRANSACTION") == 0 ||
(strcmp(commandTag, "ROLLBACK") == 0 && !(get_redirect_manager()->state.have_savepoint)));
}
@ -1328,9 +1333,9 @@ void libpqsw_disconnect(bool clear_queue)
{
RedirectManager* redirect_manager = (RedirectManager*)get_sw_cxt()->redirect_manager;
ereport(LIBPQSW_DEFAULT_LOG_LEVEL,
(errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s",
(errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s, clear_queue:%d",
redirect_manager == NULL ? -1 : ((int64)(redirect_manager)),
get_sw_cxt()->streamConn == NULL ? "true" : "false")));
get_sw_cxt()->streamConn == NULL ? "true" : "false", clear_queue)));
RedirectMessageManager* message_manager = &(redirect_manager->messages_manager);
if (clear_queue && !(message_manager->message_empty())) {
message_manager->reset();