修复备机写转发在备机自己报错后无法转ROLLBACK给主机的问题
This commit is contained in:
@ -4058,10 +4058,6 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback)
|
||||
}
|
||||
#endif
|
||||
|
||||
if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE && !libpqsw_is_end()) {
|
||||
libpqsw_disconnect(true);
|
||||
}
|
||||
|
||||
s->savepointList = NULL;
|
||||
|
||||
TwoPhaseCommit = false;
|
||||
|
||||
@ -786,62 +786,51 @@ static CachedPlanSource* libpqsw_get_plancache(StringInfo msg, int qtype)
|
||||
* if B message begin, we need search local plancache to query if
|
||||
* it is start transaction command.
|
||||
*/
|
||||
static void libpqsw_process_bind_message(StringInfo msg)
|
||||
static bool libpqsw_process_bind_message(StringInfo msg, CachedPlanSource* psrc)
|
||||
{
|
||||
if (get_redirect_manager()->messages_manager.message_empty()
|
||||
&& libpqsw_remote_in_transaction()) {
|
||||
libpqsw_set_transaction(true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (libpqsw_get_transaction() || libpqsw_get_set_command()) {
|
||||
return;
|
||||
if (libpqsw_get_set_command()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
CachedPlanSource* psrc = libpqsw_get_plancache(msg, 'B');
|
||||
if (psrc == NULL) {
|
||||
return;
|
||||
}
|
||||
libpqsw_set_command_tag(psrc->commandTag);
|
||||
(void)libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string);
|
||||
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
|
||||
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* this message obviously need judge if need transfer.
|
||||
*/
|
||||
static void libpqsw_process_transfer_message(int qtype, StringInfo msg)
|
||||
static bool libpqsw_process_transfer_message(int qtype, StringInfo msg)
|
||||
{
|
||||
if (libpqsw_redirect() && (qtype == 'B')) {
|
||||
if (qtype == 'E') {
|
||||
if (libpqsw_remote_in_transaction()) {
|
||||
libpqsw_set_transaction(true);
|
||||
}
|
||||
} else if (qtype == 'B' || qtype == 'U') {
|
||||
CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype);
|
||||
if (psrc != NULL) {
|
||||
libpqsw_set_command_tag(psrc->commandTag);
|
||||
libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string);
|
||||
if (IsAbortedTransactionBlockState() && !libpqsw_end_command(psrc->commandTag)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (qtype == 'B' && libpqsw_process_bind_message(msg, psrc)) {
|
||||
return true;
|
||||
} else if (qtype == 'U') {
|
||||
libpqsw_set_batch(true);
|
||||
}
|
||||
|
||||
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
|
||||
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (qtype == 'U') {
|
||||
libpqsw_set_batch(true);
|
||||
if (SS_STANDBY_MODE && libpqsw_get_transaction()) {
|
||||
CachedPlanSource* psrc = libpqsw_get_plancache(msg, qtype);
|
||||
if (psrc != NULL) {
|
||||
(void)libpqsw_need_localexec_forSimpleQuery(psrc->commandTag, psrc->query_list, LIBPQ_SW_BIND);
|
||||
}
|
||||
}
|
||||
} else if (qtype == 'B') {
|
||||
libpqsw_process_bind_message(msg);
|
||||
} else if (qtype == 'E') {
|
||||
if (libpqsw_remote_in_transaction()) {
|
||||
libpqsw_set_transaction(true);
|
||||
}
|
||||
} else {
|
||||
// nothing to do
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool afterpush, bool remote_execute)
|
||||
@ -856,7 +845,9 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft
|
||||
}
|
||||
|
||||
/* For B E */
|
||||
if ((redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) && afterpush &&
|
||||
if (((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) ||
|
||||
(redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) &&
|
||||
afterpush &&
|
||||
!remote_execute) {
|
||||
ret = true;
|
||||
return ret;
|
||||
@ -864,7 +855,8 @@ static bool libpqsw_need_localexec_withinPBE(int qtype, StringInfo msg, bool aft
|
||||
|
||||
/* For S */
|
||||
if (remote_execute) {
|
||||
if (redirect_manager->ss_standby_state & (SS_STANDBY_REQ_BEGIN | SS_STANDBY_REQ_END)) {
|
||||
if ((redirect_manager->ss_standby_state & SS_STANDBY_REQ_BEGIN) ||
|
||||
(redirect_manager->ss_standby_state & SS_STANDBY_REQ_END)) {
|
||||
ret = true;
|
||||
}
|
||||
|
||||
@ -1014,7 +1006,7 @@ bool libpqsw_only_localrun()
|
||||
*/
|
||||
bool libpqsw_process_message(int qtype, StringInfo msg)
|
||||
{
|
||||
if (IsAbortedTransactionBlockState() || u_sess->proc_cxt.clientIsCMAgent) {
|
||||
if (u_sess->proc_cxt.clientIsCMAgent) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -1054,7 +1046,10 @@ bool libpqsw_process_message(int qtype, StringInfo msg)
|
||||
return false;
|
||||
}
|
||||
// process U B E msg
|
||||
libpqsw_process_transfer_message(qtype, msg);
|
||||
if (!libpqsw_process_transfer_message(qtype, msg)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ready_to_excute = false;
|
||||
if (libpqsw_get_set_command()) {
|
||||
ready_to_excute = redirect_manager->push_message(qtype, msg, false, RT_SET);
|
||||
@ -1102,6 +1097,10 @@ bool libpqsw_process_parse_message(const char* commandTag, List* query_list)
|
||||
libpqsw_set_command_tag(commandTag);
|
||||
bool need_redirect = libpqsw_before_redirect(commandTag, query_list, NULL);
|
||||
|
||||
if (IsAbortedTransactionBlockState() && !libpqsw_end_command(commandTag)) {
|
||||
need_redirect = false;
|
||||
}
|
||||
|
||||
if (need_redirect && SS_STANDBY_MODE &&
|
||||
libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_PARSE)) {
|
||||
if (get_redirect_manager()->ss_standby_state & SS_STANDBY_REQ_SELECT) {
|
||||
@ -1124,6 +1123,7 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
|
||||
return false;
|
||||
}
|
||||
|
||||
bool enableCe = false;
|
||||
libpqsw_set_command_tag(commandTag);
|
||||
bool need_redirect = libpqsw_before_redirect(commandTag, query_list, query_string);
|
||||
if (need_redirect && !libpqsw_need_localexec_forSimpleQuery(commandTag, query_list, LIBPQ_SW_QUERY)) {
|
||||
@ -1147,9 +1147,10 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
|
||||
}
|
||||
}
|
||||
|
||||
enableCe = get_redirect_manager()->state.client_enable_ce;
|
||||
// because we are not skip Q message process, so send_ready_for_query will be true after transfer.
|
||||
// but after transter, master will send Z message for front, so we not need to this flag.
|
||||
if (get_redirect_manager()->state.client_enable_ce || libpqsw_end_command(commandTag) ||
|
||||
if (enableCe || libpqsw_end_command(commandTag) ||
|
||||
libpqsw_begin_command(commandTag)) {
|
||||
libpqsw_set_end(true);
|
||||
} else {
|
||||
@ -1165,6 +1166,9 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
|
||||
if (get_sw_cxt()->streamConn->xactStatus == PQTRANS_INERROR) {
|
||||
libpqsw_disconnect(true);
|
||||
AbortCurrentTransaction();
|
||||
if (!enableCe) {
|
||||
libpqsw_set_end(false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// we need send_ready_for_query for init.
|
||||
@ -1203,6 +1207,7 @@ bool libpqsw_begin_command(const char* commandTag)
|
||||
bool libpqsw_end_command(const char* commandTag)
|
||||
{
|
||||
return commandTag != NULL && (strcmp(commandTag, "COMMIT") == 0 ||
|
||||
strcmp(commandTag, "PREPARE TRANSACTION") == 0 ||
|
||||
(strcmp(commandTag, "ROLLBACK") == 0 && !(get_redirect_manager()->state.have_savepoint)));
|
||||
}
|
||||
|
||||
@ -1328,9 +1333,9 @@ void libpqsw_disconnect(bool clear_queue)
|
||||
{
|
||||
RedirectManager* redirect_manager = (RedirectManager*)get_sw_cxt()->redirect_manager;
|
||||
ereport(LIBPQSW_DEFAULT_LOG_LEVEL,
|
||||
(errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s",
|
||||
(errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s, clear_queue:%d",
|
||||
redirect_manager == NULL ? -1 : ((int64)(redirect_manager)),
|
||||
get_sw_cxt()->streamConn == NULL ? "true" : "false")));
|
||||
get_sw_cxt()->streamConn == NULL ? "true" : "false", clear_queue)));
|
||||
RedirectMessageManager* message_manager = &(redirect_manager->messages_manager);
|
||||
if (clear_queue && !(message_manager->message_empty())) {
|
||||
message_manager->reset();
|
||||
|
||||
Reference in New Issue
Block a user