diff --git a/src/gausskernel/process/tcop/postgres.cpp b/src/gausskernel/process/tcop/postgres.cpp index 5dbcf2032..a2eb7512d 100755 --- a/src/gausskernel/process/tcop/postgres.cpp +++ b/src/gausskernel/process/tcop/postgres.cpp @@ -7449,6 +7449,11 @@ static void InitGlobalNodeDefinition(PlannedStmt* planstmt) void InitThreadLocalWhenSessionExit() { t_thrd.postgres_cxt.xact_started = false; + if (u_sess != NULL) { + if (u_sess->libsw_cxt.redirect_manager == NULL) { + u_sess->libsw_cxt.redirect_manager = New(CurrentMemoryContext) RedirectManager(); + } + } } /* @@ -9395,7 +9400,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam instr_stmt_report_trace_id(u_sess->trace_cxt.trace_id); exec_parse_message(query_string, stmt_name, paramTypes, paramTypeNames, paramModes, numParams); if (libpqsw_redirect() || libpqsw_get_set_command()) { - ((RedirectManager*)t_thrd.libsw_cxt.redirect_manager)->push_message(firstchar, + get_redirect_manager()->push_message(firstchar, &input_message, true, libpqsw_get_set_command() ? RT_SET : RT_NORMAL @@ -9630,7 +9635,8 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam break; } - if (t_thrd.postgres_cxt.whereToSendOutput == DestRemote) { + if (t_thrd.postgres_cxt.whereToSendOutput == DestRemote + && !libpqsw_skip_close_command()) { pq_putemptymessage('3'); /* CloseComplete */ } } break; diff --git a/src/gausskernel/process/threadpool/knl_session.cpp b/src/gausskernel/process/threadpool/knl_session.cpp index fa6841c0f..1da5fd043 100755 --- a/src/gausskernel/process/threadpool/knl_session.cpp +++ b/src/gausskernel/process/threadpool/knl_session.cpp @@ -1375,6 +1375,14 @@ static void knl_u_clientConnTime_init(knl_u_clientConnTime_context* clientConnTi clientConnTime_cxt->checkOnlyInConnProcess = true; } +static void knl_u_libsw_init(knl_u_libsw_context* libsw_cxt) +{ + libsw_cxt->streamConn = NULL; + libsw_cxt->commandTag = NULL; + libsw_cxt->conn_trace_file = NULL; + libsw_cxt->redirect_manager = New(CurrentMemoryContext) RedirectManager(); + } + void knl_session_init(knl_session_context* sess_cxt) { Assert (0 != strncmp(CurrentMemoryContext->name, "ErrorContext", sizeof("ErrorContext"))); @@ -1464,6 +1472,7 @@ void knl_session_init(knl_session_context* sess_cxt) #ifdef ENABLE_MOT knl_u_mot_init(&sess_cxt->mot_cxt); #endif + knl_u_libsw_init(&sess_cxt->libsw_cxt); KnlURepOriginInit(&sess_cxt->reporigin_cxt); knl_u_clientConnTime_init(&sess_cxt->clientConnTime_cxt); diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index ca1b77584..e01799295 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1765,13 +1765,6 @@ void KnlLscContextInit(knl_t_lsc_context *lsc_cxt) ResourceOwnerCreate(NULL, "InitLocalSysCache", THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); } -static void knl_t_libsw_init(knl_t_libsw_context* libsw_cxt) -{ - libsw_cxt->streamConn = NULL; - libsw_cxt->commandTag = NULL; - libsw_cxt->redirect_manager = New(CurrentMemoryContext) RedirectManager(); - } - void knl_thread_init(knl_thread_role role) { t_thrd.role = role; @@ -1908,7 +1901,6 @@ void knl_thread_init(knl_thread_role role) #endif KnlDcfContextInit(&t_thrd.dcf_cxt); knl_t_page_compression_init(&t_thrd.page_compression_cxt); - knl_t_libsw_init(&t_thrd.libsw_cxt); knl_t_rc_init(&t_thrd.rc_cxt); } diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index de1c40490..3a33d5db2 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -507,8 +507,13 @@ bool WorkerThreadCanSeekAnotherMission(ThreadStayReason* reason) *reason = TWORKER_UNCONSUMEMESSAGE; return false; } else { - *reason = TWORKER_CANSEEKNEXTSESSION; - return true; + if (libpqsw_can_seek_next_session()) { + *reason = TWORKER_CANSEEKNEXTSESSION; + return true; + } else { + *reason = TWORKER_UNCONSUMEMESSAGE; + return false; + } } } } diff --git a/src/gausskernel/storage/ipc/ipc.cpp b/src/gausskernel/storage/ipc/ipc.cpp index 20a153a31..789899ce2 100644 --- a/src/gausskernel/storage/ipc/ipc.cpp +++ b/src/gausskernel/storage/ipc/ipc.cpp @@ -96,7 +96,8 @@ static const pg_on_exit_callback on_sess_exit_list[] = { pq_close, AtProcExit_Files, audit_processlogout, - log_disconnections + log_disconnections, + libpqsw_cleanup }; static const int on_sess_exit_size = lengthof(on_sess_exit_list); diff --git a/src/gausskernel/storage/replication/libpqsw.cpp b/src/gausskernel/storage/replication/libpqsw.cpp index 07b8aabfc..809b1fc07 100644 --- a/src/gausskernel/storage/replication/libpqsw.cpp +++ b/src/gausskernel/storage/replication/libpqsw.cpp @@ -51,7 +51,7 @@ int pq_flush(void); PGresult* libpqsw_get_result(PGconn* conn, libpqsw_transfer_standby_func transfer_func); bool libpqsw_connect(char* conninfo, const char *dbName, const char* userName); void libpqsw_disconnect(void); -void libpqsw_send_pbe(const char* buffer, size_t buffer_size); +bool libpqsw_send_pbe(const char* buffer, size_t buffer_size); bool libpqsw_begin_command(const char* commandTag); bool libpqsw_end_command(const char* commandTag); void libpqsw_set_redirect(bool redirect); @@ -69,6 +69,12 @@ static int libpqsw_skip_master_message(const char* s, size_t len) return 0; } +static inline knl_u_libsw_context* get_sw_cxt() +{ + Assert(u_sess != NULL); + return &(u_sess->libsw_cxt); +} + // create a empty message struct RedirectMessage* RedirectMessageManager::create_redirect_message(RedirectType msg_type) { @@ -77,7 +83,7 @@ RedirectMessage* RedirectMessageManager::create_redirect_message(RedirectType ms cur_msg->cur_pos = 0; cur_msg->type = msg_type; check_strncpy_s( - strncpy_s(cur_msg->commandTag, sizeof(cur_msg->commandTag), t_thrd.libsw_cxt.commandTag, strlen(t_thrd.libsw_cxt.commandTag)) + strncpy_s(cur_msg->commandTag, sizeof(cur_msg->commandTag), get_sw_cxt()->commandTag, strlen(get_sw_cxt()->commandTag)) ); for (int i = 0; i < PBE_MESSAGE_STACK; i++) { cur_msg->pbe_stack_msgs[i] = makeStringInfo(); @@ -87,7 +93,7 @@ RedirectMessage* RedirectMessageManager::create_redirect_message(RedirectType ms void RedirectMessageManager::push_message(int qtype, StringInfo msg, bool need_switch, RedirectType msg_type) { - MemoryContext old = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); + MemoryContext old = MemoryContextSwitchTo(SESS_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_DEFAULT)); // NOTICE: we malloc msg, so we need free myself if (messages == NIL || need_switch) { messages = lappend(messages, create_redirect_message(msg_type)); @@ -99,6 +105,10 @@ void RedirectMessageManager::push_message(int qtype, StringInfo msg, bool need_s ereport(ERROR, (errmsg("pbe message stackoverflow,please check it!"))); } cur_msg->pbe_types[cur_msg->cur_pos] = qtype; + if (msg_type == RT_NORMAL) { + // any normal msg will change + cur_msg->type = RT_NORMAL; + } last_message = qtype; copyStringInfo(cur_msg->pbe_stack_msgs[cur_msg->cur_pos], msg); cur_msg->cur_pos ++; @@ -141,7 +151,7 @@ void RedirectMessageManager::output_messages(StringInfo output, RedirectMessage* // flow database system log config bool RedirectManager::log_enable() { - return u_sess->attr.attr_common.log_statement == LOGSTMT_ALL; + return u_sess != NULL && u_sess->attr.attr_common.log_statement == LOGSTMT_ALL; } /* @@ -171,9 +181,9 @@ bool libpqsw_enable_autocommit() // receive primary msg to libpqrac stream. static bool libpqsw_receive(bool transfer = true) { - if (t_thrd.libsw_cxt.streamConn == NULL) + if (get_sw_cxt()->streamConn == NULL) return false; - PGconn* conn = t_thrd.libsw_cxt.streamConn; + PGconn* conn = get_sw_cxt()->streamConn; PGresult* result = NULL; PGresult* lastResult = NULL; bool retStatus = true; @@ -227,6 +237,7 @@ static bool libpqsw_receive(bool transfer = true) if (0 < conn->inEnd) { transfer_func(conn->inBuffer, conn->inEnd); } + PQclear(lastResult); pq_flush(); return true; } @@ -234,10 +245,10 @@ static bool libpqsw_receive(bool transfer = true) /* * this function will transfer msg to master, we support simple & extend query here. */ -static bool libpqsw_remote_excute_sql(uint32 type, const char* sql, uint32 size, const char *dbName, +static bool libpqsw_remote_excute_sql(int retry, const char* sql, uint32 size, const char *dbName, const char *userName, const char *commandTag, bool waitResult, bool transfer) { - if (t_thrd.libsw_cxt.streamConn == NULL || PQstatus(t_thrd.libsw_cxt.streamConn) != CONNECTION_OK) { + if (get_sw_cxt()->streamConn == NULL) { libpqsw_disconnect(); char conninfo[MAXCONNINFO]; errno_t rc = EOK; @@ -250,25 +261,79 @@ static bool libpqsw_remote_excute_sql(uint32 type, const char* sql, uint32 size, securec_check(rc, "\0", "\0"); libpqsw_connect(conninfo, dbName, userName); libpqsw_set_already_connected(); + } else if(PQstatus(get_sw_cxt()->streamConn) != CONNECTION_OK) { + libpqsw_disconnect(); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_RESET_BY_PEER), + errmsg("connection already bad!%s", + t_thrd.walreceiverfuncs_cxt.WalRcv->conninfo))); + } else { + // nothing to do. } - libpqsw_send_pbe(sql, size); - if (waitResult) { - return libpqsw_receive(transfer); + + // to transfer sql. + if (libpqsw_send_pbe(sql, size)) { + if (waitResult) { + return libpqsw_receive(transfer); + } + } else { + // send failed, so we need retry! + libpqsw_disconnect(); + if (retry > 0) { + return libpqsw_remote_excute_sql(retry - 1, sql, size, + dbName, userName, commandTag, + waitResult, transfer); + } + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_RESET_BY_PEER), + errmsg("libpqsw: send sql failed!%s", + t_thrd.walreceiverfuncs_cxt.WalRcv->conninfo))); } return true; } RedirectManager* get_redirect_manager() { - RedirectManager* redirect_manager = (RedirectManager*)t_thrd.libsw_cxt.redirect_manager; + RedirectManager* redirect_manager = (RedirectManager*)get_sw_cxt()->redirect_manager; Assert(redirect_manager != NULL); return redirect_manager; } +// get is transaction state +static bool libpqsw_get_transaction() +{ + return get_redirect_manager()->state.transaction; +} + static bool libpqsw_remote_in_transaction() { - return t_thrd.libsw_cxt.streamConn - && t_thrd.libsw_cxt.streamConn->xactStatus != PQTRANS_IDLE; + return get_sw_cxt()->streamConn + && get_sw_cxt()->streamConn->xactStatus == PQTRANS_INTRANS; +} + +/* get if session seek next */ +bool libpqsw_can_seek_next_session() +{ + if (!get_redirect_manager()->get_remote_excute()) { + return true; + } + return !libpqsw_remote_in_transaction() && !libpqsw_get_transaction(); +} + +void libpqsw_cleanup(int code, Datum arg) +{ + if (u_sess == NULL) { + return; + } + ereport(LIBPQSW_DEFAULT_LOG_LEVEL, + (errmsg("libpqsw(%ld): cleanup called!", + get_sw_cxt()->redirect_manager == NULL ? -1 : ((int64)(get_sw_cxt()->redirect_manager))))); + if (get_sw_cxt()->streamConn != NULL) { + libpqsw_disconnect(); + } + if (get_sw_cxt()->redirect_manager != NULL) { + DELETE_EX_TYPE(get_sw_cxt()->redirect_manager, RedirectManager); + } } @@ -281,12 +346,6 @@ static void libpqsw_set_transaction(bool transaction) get_redirect_manager()->state.transaction = transaction; } -// get is transaction state -static bool libpqsw_get_transaction() -{ - return get_redirect_manager()->state.transaction; -} - static bool libpqsw_before_redirect(const char* commandTag, List* query_list, const char* query_string) { RedirectManager* redirect_manager = get_redirect_manager(); @@ -341,9 +400,10 @@ static bool libpqsw_before_redirect(const char* commandTag, List* query_list, co return need_redirect; } -static void libpqsw_after_redirect(const char* commandTag) +static void libpqsw_after_redirect() { - if (libpqsw_end_command(commandTag)) { + if (!libpqsw_remote_in_transaction()) { + finish_xact_command(); libpqsw_set_transaction(false); } } @@ -365,6 +425,13 @@ void libpqsw_set_redirect(bool redirect) get_redirect_manager()->state.redirect = redirect; } +/* udpate redirect flag */ +bool libpqsw_get_redirect() +{ + return get_redirect_manager()->state.redirect; +} + + // set is in batch insert void libpqsw_set_batch(bool batch) { @@ -380,7 +447,7 @@ static bool libpqsw_get_batch() /* query if enable redirect*/ bool libpqsw_redirect() { - return get_redirect_manager()->state.redirect || libpqsw_get_batch() || libpqsw_get_transaction(); + return libpqsw_get_redirect() || libpqsw_get_batch() || libpqsw_get_transaction(); } /* query if enable set command*/ @@ -394,6 +461,12 @@ bool libpqsw_skip_check_readonly() { return get_redirect_manager()->get_remote_excute(); } +bool libpqsw_skip_close_command() { + // only master node and in SW_SENDER will skip 'C' message. + // mix use prepared statement in master and slave may get some error + return g_instance.attr.attr_sql.enableRemoteExcute && t_thrd.role == SW_SENDER; +} + /* query if enable set command*/ void libpqsw_set_set_command(bool set_command) { @@ -425,42 +498,117 @@ static void libpqsw_inner_excute_pbe(bool waitResult, bool updateFlag) waitResult, redirect_msg->type == RT_NORMAL); if (updateFlag) { - libpqsw_after_redirect(redirect_msg->commandTag); + libpqsw_after_redirect(); } } message_manager->reset(); } +static inline int libpqsw_connection_status() { + return get_sw_cxt()->streamConn == NULL ? -1 : get_sw_cxt()->streamConn->xactStatus; +} + /* * only support P msg. */ -static inline void libpqsw_trace_p_msg(StringInfo msg) +static inline void libpqsw_trace_p_msg(int qtype, StringInfo msg) { - if (!libpqsw_log_enable()) { - return; - } const char* stmt = msg->data; const char* query = msg->data + strlen(stmt) + 1; libpqsw_trace("P: stmt=%s, query=%s", stmt, query); } /* -* if B message begin, we need search local plancache to query if -* it is start transaction command. +* only support B msg. */ -static bool libpqsw_process_bind_message(StringInfo msg) +static inline void libpqsw_trace_b_msg(int qtype, StringInfo msg) { - if (t_thrd.libsw_cxt.streamConn == NULL) { - return false; - } const char* portal = msg->data; const char* stmt = msg->data + strlen(portal) + 1; - libpqsw_trace("B: portal=%s, stmt=%s, trans:%d", portal, stmt, t_thrd.libsw_cxt.streamConn->xactStatus); - if (get_redirect_manager()->messages_manager.message_empty() - && t_thrd.libsw_cxt.streamConn->xactStatus != PQTRANS_IDLE) { - libpqsw_set_transaction(true); - return true; + libpqsw_trace("B: portal=%s, stmt=%s, trans:%d", portal, stmt, + libpqsw_connection_status() + ); +} + +/* +* only support U msg. +*/ +static inline void libpqsw_trace_u_msg(int qtype, StringInfo msg) +{ + int batch_count = ntohl(*(uint32*)(msg->data)); + const char* portal = msg->data + 4; + const char* stmt = msg->data + 4 + strlen(portal) + 1; + libpqsw_trace("U: portal=%s, stmt=%s, trans:%d, count:%d", + portal, + stmt, + libpqsw_connection_status(), + batch_count); +} + +/* +* only support C msg. +*/ +static inline void libpqsw_trace_c_msg(int qtype, StringInfo msg) +{ + unsigned char close_type = (unsigned char)(msg->data[0]); + const char* close_target = msg->data + 1; + libpqsw_trace("C: close_type=%c, target_name=%s, trans:%d, redirect:%s, remote:%s", + close_type, + close_target, + libpqsw_connection_status(), + libpqsw_redirect() ? "true" : "false", + libpqsw_remote_in_transaction() ? "true" : "false"); +} + +/* +* only support other msg. +*/ +static inline void libpqsw_trace_other_msg(int qtype, StringInfo msg) +{ + libpqsw_trace("%c: %d data=%s, size=%d, trans:%d, redirect:%s", + qtype, + qtype, + msg->data == NULL ? "" : msg->data, + msg->len, + libpqsw_connection_status(), + libpqsw_redirect() ? "true" : "false"); +} + +static inline void libpqsw_trace_empty_msg(int qtype, StringInfo msg) +{ + // nothing to do. +} + +typedef void (*trace_msg_func)(int qtype, StringInfo info); +static trace_msg_func get_msg_trace_func(int qtype) +{ + trace_msg_func cur_func = libpqsw_trace_empty_msg; + if (!libpqsw_log_enable()) { + return cur_func; } + switch(qtype) { + case 'P': + cur_func = libpqsw_trace_p_msg; + break; + case 'B': + cur_func = libpqsw_trace_b_msg; + break; + case 'U': + cur_func = libpqsw_trace_u_msg; + break; + case 'C': + cur_func = libpqsw_trace_c_msg; + break; + default: + cur_func = libpqsw_trace_other_msg; + break; + } + return cur_func; +} + +static CachedPlanSource* libpqsw_get_plancache(StringInfo msg) +{ + const char* stmt = msg->data + strlen(msg->data) + 1; CachedPlanSource* psrc = NULL; if (strlen(stmt) != 0) { PreparedStatement *pstmt = FetchPreparedStatement(stmt, false, false); @@ -472,6 +620,22 @@ static bool libpqsw_process_bind_message(StringInfo msg) } if (psrc == NULL) { libpqsw_warn("we can't find cached plan, stmt=%s", stmt); + } + return psrc; +} +/* +* if B message begin, we need search local plancache to query if +* it is start transaction command. +*/ +static bool libpqsw_process_bind_message(StringInfo msg) +{ + if (get_redirect_manager()->messages_manager.message_empty() + && libpqsw_remote_in_transaction()) { + libpqsw_set_transaction(true); + return true; + } + CachedPlanSource* psrc = libpqsw_get_plancache(msg); + if (psrc == NULL) { return false; } return libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string); @@ -483,6 +647,14 @@ static bool libpqsw_process_bind_message(StringInfo msg) static void libpqsw_process_transfer_message(int qtype, StringInfo msg) { if (libpqsw_redirect()) { + // we need update commandTag + if (qtype == 'B') { + CachedPlanSource* psrc = libpqsw_get_plancache(msg); + if (psrc != NULL) { + libpqsw_set_command_tag(psrc->commandTag); + libpqsw_before_redirect(psrc->commandTag, psrc->query_list, psrc->query_string); + } + } return; } if (qtype == 'U') { @@ -517,14 +689,24 @@ bool libpqsw_process_message(int qtype, StringInfo msg) if (!redirect_manager->get_remote_excute()) { return false; } + trace_msg_func trace_func = get_msg_trace_func(qtype); + trace_func(qtype, msg); // the extend query start msg if (qtype == 'P') { - libpqsw_trace_p_msg(msg); return false; } // the simple query start msg if (qtype == 'Q') { return false; + } + if (qtype == 'C') { + // need close standby plancache. + if (libpqsw_redirect()) { + if (!libpqsw_skip_close_command()) { + redirect_manager->push_message(qtype, msg, false, RT_NORMAL); + } + } + return false; } // exit msg if (qtype == 'X' || qtype == -1) { @@ -623,7 +805,52 @@ bool libpqsw_end_command(const char* commandTag) // set commandTag void libpqsw_set_command_tag(const char* commandTag) { - t_thrd.libsw_cxt.commandTag = commandTag; + get_sw_cxt()->commandTag = commandTag; +} + +// session never timeout! +static void libpqsw_session_never_timout(struct pg_conn* conn) { + PGresult* res = PQexec(conn, "SET session_timeout = 0"); + PQclear(res); +} + +static void libpqsw_process_port_trace() +{ + if (!(LIBPQSW_ENABLE_PORT_TRACE)) { + return; + } + char trace_file_path[MAX_PATH_LEN + 1] = {0}; + char real_path[MAX_PATH_LEN + 1] = {0}; + char* loghome = gs_getenv_r("GAUSSLOG"); + int ret = 0; + if (loghome && '\0' != loghome[0]) { + check_backend_env(loghome); + if (realpath(loghome, real_path) == NULL) { + libpqsw_warn("failed to realpath $GAUSSLOG/pg_log!"); + return; + } + ret = snprintf_s(trace_file_path, MAX_PATH_LEN + 1, MAX_PATH_LEN, "%s/pg_log/libpqsw", real_path); + securec_check_ss(ret, "", ""); + } else { + ret = snprintf_s(trace_file_path, MAX_PATH_LEN + 1, MAX_PATH_LEN, "./pg_log/libpqsw"); + securec_check_ss(ret, "", ""); + } + + // trace_file_path not exist, create trace_file_path path + if (0 != pg_mkdir_p(trace_file_path, S_IRWXU) && errno != EEXIST) { + libpqsw_warn("failed to mkdir $GAUSSLOG/pg_log/libpqsw!"); + return; + } + + char trace_file[MAX_PATH_LEN + 1] = {0}; + ret = snprintf_s(trace_file, MAX_PATH_LEN + 1, MAX_PATH_LEN, "%s/%ld.log", + trace_file_path, (int64)(get_sw_cxt()->redirect_manager)); + securec_check_ss(ret, "", ""); + FILE* cur_file = fopen(trace_file, "w"); + if (cur_file != NULL) { + get_sw_cxt()->conn_trace_file = cur_file; + PQtrace(get_sw_cxt()->streamConn, cur_file); + } } /* @@ -644,20 +871,23 @@ bool libpqsw_connect(char* conninfo, const char *dbName, const char* userName) "fallback_application_name=%s " "connect_timeout=%d client_encoding=auto", conninfo, dbName, userName, - "rac", + "sw", u_sess->attr.attr_storage.wal_receiver_connect_timeout); securec_check_ss(nRet, "", ""); - libpqsw_info("Connecting to remote server :%s", conninfoRepl); - - t_thrd.libsw_cxt.streamConn = PQconnectdb(conninfoRepl); - if (PQstatus(t_thrd.libsw_cxt.streamConn) != CONNECTION_OK) { + + get_sw_cxt()->streamConn = PQconnectdb(conninfoRepl); + if (PQstatus(get_sw_cxt()->streamConn) != CONNECTION_OK) { + libpqsw_info("Connecting to remote server :%s ...failed!", conninfoRepl); ereport(ERROR, (errcode(ERRCODE_CONNECTION_TIMED_OUT), - errmsg("rac could not connect to the remote server,the connection info :%s : %s", + errmsg("standbywrite could not connect to the remote server,the connection info :%s : %s", conninfo, - PQerrorMessage(t_thrd.libsw_cxt.streamConn)))); + PQerrorMessage(get_sw_cxt()->streamConn)))); } + libpqsw_info("Connecting to remote server :%s ...success!", conninfoRepl); + libpqsw_session_never_timout(get_sw_cxt()->streamConn); + libpqsw_process_port_trace(); return true; } @@ -666,9 +896,23 @@ bool libpqsw_connect(char* conninfo, const char *dbName, const char* userName) */ void libpqsw_disconnect(void) { - PQfinish(t_thrd.libsw_cxt.streamConn); - t_thrd.libsw_cxt.streamConn = NULL; - get_redirect_manager()->init(); + ereport(LIBPQSW_DEFAULT_LOG_LEVEL, + (errmsg("libpqsw(%ld): libpqsw_disconnect called, conn is null:%s", + get_sw_cxt()->redirect_manager == NULL ? -1 : ((int64)(get_sw_cxt()->redirect_manager)), + get_sw_cxt()->streamConn == NULL ? "true" : "false"))); + if (get_sw_cxt()->streamConn != NULL) { + if (get_sw_cxt()->conn_trace_file != NULL) { + PQuntrace(get_sw_cxt()->streamConn); + fclose(get_sw_cxt()->conn_trace_file); + get_sw_cxt()->conn_trace_file = NULL; + } + PQfinish(get_sw_cxt()->streamConn); + get_sw_cxt()->streamConn = NULL; + } + + if (get_sw_cxt()->redirect_manager != NULL) { + get_redirect_manager()->init(); + } } // parse primary results. @@ -697,8 +941,8 @@ PGresult* libpqsw_get_result(PGconn* conn, libpqsw_transfer_standby_func transfe } } // because pqReadData will reset inStart to 0, so we must send to frontend before pqReadData. - if (0 < t_thrd.libsw_cxt.streamConn->inStart) { - transfer_func(t_thrd.libsw_cxt.streamConn->inBuffer, t_thrd.libsw_cxt.streamConn->inStart); + if (0 < get_sw_cxt()->streamConn->inStart) { + transfer_func(get_sw_cxt()->streamConn->inBuffer, get_sw_cxt()->streamConn->inStart); } /* Wait for some more data, and load it. */ if (flushResult || pqWait(TRUE, FALSE, conn) || pqReadData(conn) < 0) { @@ -706,9 +950,14 @@ PGresult* libpqsw_get_result(PGconn* conn, libpqsw_transfer_standby_func transfe * conn->errorMessage has been set by pqWait or pqReadData. We * want to append it to any already-received error message. */ + libpqsw_trace("libpqsw_get_result->read data failed, conn_state:%d,[ok->%d, bad->%d]", + conn->status, + CONNECTION_OK, + CONNECTION_BAD); conn->status = CONNECTION_BAD; pqSaveErrorResult(conn); conn->asyncStatus = PGASYNC_IDLE; + return pqPrepareAsyncResult(conn); } /* Parse it. */ @@ -790,11 +1039,13 @@ static int libpqsw_before_send(PGconn* conn) } // send extend query to master. -void libpqsw_send_pbe(const char* buffer, size_t buffer_size) +bool libpqsw_send_pbe(const char* buffer, size_t buffer_size) { - struct pg_conn* conn = t_thrd.libsw_cxt.streamConn; + struct pg_conn* conn = get_sw_cxt()->streamConn; + bool result = true; if (!libpqsw_before_send(conn)) { libpqsw_disconnect(); + result = false; ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("libpqsw_send_pbe check failed, master can't connect!"))); @@ -802,6 +1053,7 @@ void libpqsw_send_pbe(const char* buffer, size_t buffer_size) conn->outMsgEnd = conn->outMsgStart = conn->outCount; if (pqPutnchar(buffer, buffer_size, conn) < 0) { libpqsw_disconnect(); + result = false; ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS), errmsg("libpqsw_send_pbe could not send: %s", PQerrorMessage(conn)))); @@ -816,11 +1068,13 @@ void libpqsw_send_pbe(const char* buffer, size_t buffer_size) * to send it all; PQgetResult() will do any additional flushing needed. */ if (pqFlush(conn) < 0) { + result = false; pqHandleSendFailure(conn); - return; + return result; } /* OK, it's launched! */ conn->asyncStatus = PGASYNC_BUSY; + return result; } diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 490e7f09b..96bdafee2 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -67,7 +67,7 @@ #include "utils/memgroup.h" #include "storage/lock/lock.h" #include "utils/elog.h" -#include "tcop/dest.h" +#include "tcop/dest.h" typedef void (*pg_on_exit_callback)(int code, Datum arg); @@ -678,7 +678,7 @@ typedef struct knl_u_utils_context { unsigned int sql_ignore_strategy_val; HTAB* set_user_params_htab; - DestReceiver* spi_printtupDR; + DestReceiver* spi_printtupDR; } knl_u_utils_context; typedef struct knl_u_security_context { @@ -2738,6 +2738,18 @@ typedef struct knl_u_hook_context { void *pluginCCHashEqFuncs; void *plpgsqlParserSetHook; } knl_u_hook_context; + +typedef struct knl_u_libsw_context { + /* Current connection to the primary, if any */ + struct pg_conn* streamConn; + /* trace port log file */ + FILE* conn_trace_file; + /* which command in last sql */ + const char* commandTag; + /* the redirect manager */ + void* redirect_manager; +} knl_u_libsw_context; + /* PBE message flag */ typedef enum { NO_QUERY, @@ -2884,6 +2896,9 @@ typedef struct knl_session_context { struct pg_tm cache_tm; fsec_t cache_fsec; int cache_tz; + + /* standby write. */ + knl_u_libsw_context libsw_cxt; } knl_session_context; enum stp_xact_err_type { diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 1908e7011..3663c6c1c 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3078,15 +3078,6 @@ typedef struct knl_t_security_ledger_context { void *prev_ExecutorEnd; } knl_t_security_ledger_context; -typedef struct knl_t_libsw_context { - /* Current connection to the primary, if any */ - struct pg_conn* streamConn; - /* which command in last sql */ - const char* commandTag; - /* the redirect manager */ - void* redirect_manager; -} knl_t_libsw_context; - typedef struct knl_t_csnmin_sync_context { volatile sig_atomic_t got_SIGHUP; volatile sig_atomic_t shutdown_requested; @@ -3503,7 +3494,6 @@ typedef struct knl_thrd_context { knl_t_cfs_shrinker_context cfs_shrinker_cxt; knl_t_sql_patch_context sql_patch_cxt; knl_t_dms_context dms_cxt; - knl_t_libsw_context libsw_cxt; knl_t_rc_context rc_cxt; } knl_thrd_context; diff --git a/src/include/replication/libpqsw.h b/src/include/replication/libpqsw.h index abd1129de..d28d2a85f 100644 --- a/src/include/replication/libpqsw.h +++ b/src/include/replication/libpqsw.h @@ -56,17 +56,27 @@ bool enable_remote_excute(); bool libpqsw_get_set_command(); /* if skip readonly check in P or Q message */ bool libpqsw_skip_check_readonly(); +/* judge if we need reply '3' for 'C' msg*/ +bool libpqsw_skip_close_command(); /* get unique redirect manager*/ RedirectManager* get_redirect_manager(); +/* get if session seek next */ +bool libpqsw_can_seek_next_session(); +/* clear libpqsw memory when process/session exit */ +void libpqsw_cleanup(int code, Datum arg); + #ifdef _cplusplus } #endif - // default is output log. #define LIBPQSW_ENABLE_LOG 1 #define LIBPQSW_DEFAULT_LOG_LEVEL LOG +// default is not output libpq message trace +// log will in $GAUSSLOG/libpqsw/xx.log +#define LIBPQSW_ENABLE_PORT_TRACE (0) + #define libpqsw_log_enable() (get_redirect_manager()->log_enable()) #if LIBPQSW_ENABLE_LOG #define libpqsw_trace(fmt, ...) (get_redirect_manager()->logtrace(LIBPQSW_DEFAULT_LOG_LEVEL, fmt, ##__VA_ARGS__)) @@ -102,7 +112,7 @@ typedef struct { #define PBE_MAX_SET_BLOCK (10) enum RedirectType { RT_NORMAL, //transfer to standby - RT_SET //not transfer to standby + RT_SET //not transfer to standby,set props=xxx or 'C' close msg }; typedef struct { @@ -127,6 +137,9 @@ public: } void reset() { + if (messages == NIL) { + return; + } foreach_cell(message, messages) { free_redirect_message((RedirectMessage*)lfirst(message)); } @@ -205,7 +218,16 @@ public: state.need_end = true; state.already_connected = false; } - + + void Destroy() + { + messages_manager.reset(); + if (log_trace_msg != NULL) { + DestroyStringInfo(log_trace_msg); + log_trace_msg = NULL; + } + } + bool push_message(int qtype, StringInfo msg, bool need_switch, RedirectType msg_type) { // if one msg have many sql like 'set a;set b;set c', don't switch @@ -233,7 +255,7 @@ public: void logtrace(int level, const char* fmt, ...) { - if (!log_enable()) { + if (!log_enable() || log_trace_msg == NULL) { return; } if (fmt != log_trace_msg->data) { @@ -244,12 +266,13 @@ public: (void)vsnprintf_s(log_trace_msg->data, log_trace_msg->maxlen, log_trace_msg->maxlen - 1, fmt, args); va_end(args); } - ereport(level, (errmsg("libpqsw:%s", log_trace_msg->data))); + ereport(level, (errmsg("libpqsw(%ld-%ld):%s", (uint64)this, + u_sess == NULL ? 0 : u_sess->session_id, log_trace_msg->data))); } virtual ~RedirectManager() { - DestroyStringInfo(log_trace_msg); + Destroy(); } public: RedirectState state;