!3686 [bug fix] 修复写转发开启后事务内执行的函数中包含DDL,执行报错的问题
Merge pull request !3686 from cchen676/1207master
This commit is contained in:
@ -47,6 +47,7 @@
|
||||
#include "utils/elog.h"
|
||||
#include "commands/sqladvisor.h"
|
||||
#include "distributelayer/streamMain.h"
|
||||
#include "replication/libpqsw.h"
|
||||
|
||||
#ifdef ENABLE_MOT
|
||||
#include "storage/mot/jit_exec.h"
|
||||
@ -2492,6 +2493,13 @@ void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan, parse_query_fun
|
||||
foreach (list_item, raw_parsetree_list) {
|
||||
Node *parsetree = (Node *)lfirst(list_item);
|
||||
CachedPlanSource *plansource = NULL;
|
||||
|
||||
#ifndef ENABLE_MULTIPLE_NODES
|
||||
if (g_instance.attr.attr_sql.enableRemoteExcute) {
|
||||
libpqsw_check_ddl_on_primary(CreateCommandTag(parsetree));
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef ENABLE_MULTIPLE_NODES
|
||||
if (IS_PGXC_COORDINATOR && PointerIsValid(query_string_locationlist) &&
|
||||
list_length(query_string_locationlist) > 1) {
|
||||
|
||||
@ -4022,6 +4022,10 @@ static void AbortTransaction(bool PerfectRollback, bool STP_rollback)
|
||||
}
|
||||
#endif
|
||||
|
||||
if (SS_STANDBY_MODE_WITH_REMOTE_EXECUTE) {
|
||||
libpqsw_disconnect();
|
||||
}
|
||||
|
||||
s->savepointList = NULL;
|
||||
|
||||
TwoPhaseCommit = false;
|
||||
|
||||
@ -257,7 +257,9 @@ static bool libpqsw_receive(bool transfer = true)
|
||||
if (u_sess->attr.attr_common.enable_full_encryption && *(conn->inBuffer) == 'Z') {
|
||||
get_redirect_manager()->state.client_enable_ce = true;
|
||||
} else {
|
||||
transfer_func(conn->inBuffer, conn->inEnd);
|
||||
if (get_sw_cxt()->streamConn->xactStatus != PQTRANS_INERROR) {
|
||||
transfer_func(conn->inBuffer, conn->inEnd);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1054,6 +1056,12 @@ bool libpqsw_process_query_message(const char* commandTag, List* query_list, con
|
||||
libpqsw_inner_excute_pbe(true, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (get_sw_cxt()->streamConn->xactStatus == PQTRANS_INERROR) {
|
||||
libpqsw_disconnect();
|
||||
ereport(ERROR, (errmsg("The primary node report error when last request was transferred to it!")));
|
||||
}
|
||||
|
||||
// because we are not skip Q message process, so send_ready_for_query will be true after transfer.
|
||||
// but after transter, master will send Z message for front, so we not need to this flag.
|
||||
if (get_redirect_manager()->state.client_enable_ce || libpqsw_end_command(commandTag) ||
|
||||
@ -1114,7 +1122,7 @@ bool libpqsw_fetch_command(const char* commandTag)
|
||||
// is special commandTag need forbid redirect
|
||||
bool libpqsw_special_command(const char* commandTag)
|
||||
{
|
||||
return commandTag != NULL && strcmp(commandTag, "LOCK TABLE") == 0;
|
||||
return commandTag != NULL && (strcmp(commandTag, "LOCK TABLE") == 0 || strcmp(commandTag, "TRUNCATE TABLE") == 0);
|
||||
}
|
||||
|
||||
// is special commandTag need forbid redirect
|
||||
@ -1404,3 +1412,11 @@ bool libpqsw_send_pbe(const char* buffer, size_t buffer_size)
|
||||
conn->asyncStatus = PGASYNC_BUSY;
|
||||
return result;
|
||||
}
|
||||
|
||||
void libpqsw_check_ddl_on_primary(const char* commandTag)
|
||||
{
|
||||
if (commandTag != NULL && t_thrd.role == SW_SENDER && IsTransactionInProgressState() &&
|
||||
(set_command_type_by_commandTag(commandTag) == CMD_DDL || libpqsw_special_command(commandTag))) {
|
||||
ereport(ERROR, (errmsg("The multi-write feature doesn't support DDL within transaction or function!")));
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,6 +85,7 @@ bool libpqsw_only_localrun();
|
||||
void libpqsw_create_conn();
|
||||
void libpqsw_trace_q_msg(const char* commandTag, const char* queryString);
|
||||
void libpqsw_disconnect(void);
|
||||
void libpqsw_check_ddl_on_primary(const char* commandTag);
|
||||
|
||||
#ifdef _cplusplus
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user