diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 8cc017fb2..02775fbb7 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -4058,10 +4058,6 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback) } #endif - if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE && !libpqsw_is_end()) { - libpqsw_disconnect(true); - } - s->savepointList = NULL; TwoPhaseCommit = false; diff --git a/src/gausskernel/storage/replication/libpqsw.cpp b/src/gausskernel/storage/replication/libpqsw.cpp index 689c37ff0..98054bacf 100644 --- a/src/gausskernel/storage/replication/libpqsw.cpp +++ b/src/gausskernel/storage/replication/libpqsw.cpp @@ -786,62 +786,51 @@ static CachedPlanSource* libpqsw_get_plancache(StringInfo msg, int qtype) * if B message begin, we need search local plancache to query if * it is start transaction command. */ -static void libpqsw_process_bind_message(StringInfo msg) +static bool libpqsw_process_bind_message(StringInfo msg, CachedPlanSource* psrc) { if (get_redirect_manager()->messages_manager.message_empty() && libpqsw_remote_in_transaction()) { libpqsw_set_transaction(true); - return; } - if (libpqsw_get_transaction() || libpqsw_get_set_command()) { - return; + if (libpqsw_get_set_command()) { + return true; } - CachedPlanSource* psrc = libpqsw_get_plancache(msg, 'B'); - if (psrc == NULL) { - return; - } + libpqsw_set_command_tag(psrc->commandTag); (void)libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string); - if (SS_STANDBY_MODE && libpqsw_get_transaction()) { - (void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND); - } + return false; } /* * this message obviously need judge if need transfer. */ -static void libpqsw_process_transfer_message(int qtype, StringInfo msg) +static bool libpqsw_process_transfer_message(int qtype, StringInfo msg) { - if (libpqsw_redirect() && (qtype == 'B')) { + if (qtype == 'E') { + if (libpqsw_remote_in_transaction()) { + libpqsw_set_transaction(true); + } + } else if (qtype == 'B' || qtype == 'U') { CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype); if (psrc != NULL) { - libpqsw_set_command_tag(psrc->commandTag); - libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string); + if (IsAbortedTransactionBlockState() && !libpqsw_end_command(psrc->commandTag)) { + return false; + } + + if (qtype == 'B' && libpqsw_process_bind_message(msg, psrc)) { + return true; + } else if (qtype == 'U') { + libpqsw_set_batch(true); + } + if (SS_STANDBY_MODE && libpqsw_get_transaction()) { (void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND); } } - return; } - if (qtype == 'U') { - libpqsw_set_batch(true); - if (SS_STANDBY_MODE && libpqsw_get_transaction()) { - CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype); - if (psrc != NULL) { - (void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND); - } - } - } else if (qtype == 'B') { - libpqsw_process_bind_message(msg); - } else if (qtype == 'E') { - if (libpqsw_remote_in_transaction()) { - libpqsw_set_transaction(true); - } - } else { - // nothing to do - } + return true; } static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool afterpush, bool remote_execute) @@ -856,7 +845,9 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft } /* For B E */ - if ((redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) && afterpush && + if (((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) || + (redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) && + afterpush && !remote_execute) { ret = true; return ret; @@ -864,7 +855,8 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft /* For S */ if (remote_execute) { - if (redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) { + if ((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) || + (redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) { ret = true; } @@ -1014,7 +1006,7 @@ bool libpqsw_only_localrun() */ bool libpqsw_process_message(int qtype, StringInfo msg) { - if (IsAbortedTransactionBlockState() || u_sess->proc_cxt.clientIsCMAgent) { + if (u_sess->proc_cxt.clientIsCMAgent) { return false; } @@ -1054,7 +1046,10 @@ bool libpqsw_process_message(int qtype, StringInfo msg) return false; } // process U B E msg - libpqsw_process_transfer_message(qtype, msg); + if (!libpqsw_process_transfer_message(qtype, msg)) { + return false; + } + bool ready_to_excute = false; if (libpqsw_get_set_command()) { ready_to_excute = redirect_manager->push_message(qtype, msg, false, RT_SET); @@ -1102,6 +1097,10 @@ bool libpqsw_process_parse_message(const char* commandTag, List* query_list) libpqsw_set_command_tag(commandTag); bool need_redirect = libpqsw_before_redirect(commandTag, query_list, NULL); + if (IsAbortedTransactionBlockState() && !libpqsw_end_command(commandTag)) { + need_redirect = false; + } + if (need_redirect && SS_STANDBY_MODE && libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_PARSE)) { if (get_redirect_manager()->ss_standby_state & SS_STANDBY_REQ_SELECT) { @@ -1124,6 +1123,7 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con return false; } + bool enableCe = false; libpqsw_set_command_tag(commandTag); bool need_redirect = libpqsw_before_redirect(commandTag, query_list, query_string); if (need_redirect && !libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_QUERY)) { @@ -1147,9 +1147,10 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con } } + enableCe = get_redirect_manager()->state.client_enable_ce; // because we are not skip Q message process, so send_ready_for_query will be true after transfer. // but after transter, master will send Z message for front, so we not need to this flag. - if (get_redirect_manager()->state.client_enable_ce || libpqsw_end_command(commandTag) || + if (enableCe || libpqsw_end_command(commandTag) || libpqsw_begin_command(commandTag)) { libpqsw_set_end(true); } else { @@ -1165,6 +1166,9 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con if (get_sw_cxt()->streamConn->xactStatus == PQTRANS_INERROR) { libpqsw_disconnect(true); AbortCurrentTransaction(); + if (!enableCe) { + libpqsw_set_end(false); + } } } else { // we need send_ready_for_query for init. @@ -1203,6 +1207,7 @@ bool libpqsw_begin_command(const char* commandTag) bool libpqsw_end_command(const char* commandTag) { return commandTag != NULL && (strcmp(commandTag, "COMMIT") == 0 || + strcmp(commandTag, "PREPARE TRANSACTION") == 0 || (strcmp(commandTag, "ROLLBACK") == 0 && !(get_redirect_manager()->state.have_savepoint))); } @@ -1328,9 +1333,9 @@ void libpqsw_disconnect(bool clear_queue) { RedirectManager* redirect_manager = (RedirectManager*)get_sw_cxt()->redirect_manager; ereport(LIBPQSW_DEFAULT_LOG_LEVEL, - (errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s", + (errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s, clear_queue:%d", redirect_manager == NULL ? -1 : ((int64)(redirect_manager)), - get_sw_cxt()->streamConn == NULL ? "true" : "false"))); + get_sw_cxt()->streamConn == NULL ? "true" : "false", clear_queue))); RedirectMessageManager* message_manager = &(redirect_manager->messages_manager); if (clear_queue && !(message_manager->message_empty())) { message_manager->reset();