!3112 [bugfix] 解决开启thread_pool之后standywrite模块异常的问题。

Merge pull request !3112 from 周斌/sw_3
This commit is contained in:
opengauss-bot
2023-03-14 14:57:50 +00:00
committed by Gitee
9 changed files with 382 additions and 87 deletions

View File

@ -67,7 +67,7 @@
#include "utils/memgroup.h"
#include "storage/lock/lock.h"
#include "utils/elog.h"
#include "tcop/dest.h"
#include "tcop/dest.h"
typedef void (*pg_on_exit_callback)(int code, Datum arg);
@ -678,7 +678,7 @@ typedef struct knl_u_utils_context {
unsigned int sql_ignore_strategy_val;
HTAB* set_user_params_htab;
DestReceiver* spi_printtupDR;
DestReceiver* spi_printtupDR;
} knl_u_utils_context;
typedef struct knl_u_security_context {
@ -2738,6 +2738,18 @@ typedef struct knl_u_hook_context {
void *pluginCCHashEqFuncs;
void *plpgsqlParserSetHook;
} knl_u_hook_context;
typedef struct knl_u_libsw_context {
/* Current connection to the primary, if any */
struct pg_conn* streamConn;
/* trace port log file */
FILE* conn_trace_file;
/* which command in last sql */
const char* commandTag;
/* the redirect manager */
void* redirect_manager;
} knl_u_libsw_context;
/* PBE message flag */
typedef enum {
NO_QUERY,
@ -2884,6 +2896,9 @@ typedef struct knl_session_context {
struct pg_tm cache_tm;
fsec_t cache_fsec;
int cache_tz;
/* standby write. */
knl_u_libsw_context libsw_cxt;
} knl_session_context;
enum stp_xact_err_type {

View File

@ -3078,15 +3078,6 @@ typedef struct knl_t_security_ledger_context {
void *prev_ExecutorEnd;
} knl_t_security_ledger_context;
typedef struct knl_t_libsw_context {
/* Current connection to the primary, if any */
struct pg_conn* streamConn;
/* which command in last sql */
const char* commandTag;
/* the redirect manager */
void* redirect_manager;
} knl_t_libsw_context;
typedef struct knl_t_csnmin_sync_context {
volatile sig_atomic_t got_SIGHUP;
volatile sig_atomic_t shutdown_requested;
@ -3503,7 +3494,6 @@ typedef struct knl_thrd_context {
knl_t_cfs_shrinker_context cfs_shrinker_cxt;
knl_t_sql_patch_context sql_patch_cxt;
knl_t_dms_context dms_cxt;
knl_t_libsw_context libsw_cxt;
knl_t_rc_context rc_cxt;
} knl_thrd_context;

View File

@ -56,17 +56,27 @@ bool enable_remote_excute();
bool libpqsw_get_set_command();
/* if skip readonly check in P or Q message */
bool libpqsw_skip_check_readonly();
/* judge if we need reply '3' for 'C' msg*/
bool libpqsw_skip_close_command();
/* get unique redirect manager*/
RedirectManager* get_redirect_manager();
/* get if session seek next */
bool libpqsw_can_seek_next_session();
/* clear libpqsw memory when process/session exit */
void libpqsw_cleanup(int code, Datum arg);
#ifdef _cplusplus
}
#endif
// default is output log.
#define LIBPQSW_ENABLE_LOG 1
#define LIBPQSW_DEFAULT_LOG_LEVEL LOG
// default is not output libpq message trace
// log will in $GAUSSLOG/libpqsw/xx.log
#define LIBPQSW_ENABLE_PORT_TRACE (0)
#define libpqsw_log_enable() (get_redirect_manager()->log_enable())
#if LIBPQSW_ENABLE_LOG
#define libpqsw_trace(fmt, ...) (get_redirect_manager()->logtrace(LIBPQSW_DEFAULT_LOG_LEVEL, fmt, ##__VA_ARGS__))
@ -102,7 +112,7 @@ typedef struct {
#define PBE_MAX_SET_BLOCK (10)
enum RedirectType {
RT_NORMAL, //transfer to standby
RT_SET //not transfer to standby
RT_SET //not transfer to standby,set props=xxx or 'C' close msg
};
typedef struct {
@ -127,6 +137,9 @@ public:
}
void reset() {
if (messages == NIL) {
return;
}
foreach_cell(message, messages) {
free_redirect_message((RedirectMessage*)lfirst(message));
}
@ -205,7 +218,16 @@ public:
state.need_end = true;
state.already_connected = false;
}
void Destroy()
{
messages_manager.reset();
if (log_trace_msg != NULL) {
DestroyStringInfo(log_trace_msg);
log_trace_msg = NULL;
}
}
bool push_message(int qtype, StringInfo msg, bool need_switch, RedirectType msg_type)
{
// if one msg have many sql like 'set a;set b;set c', don't switch
@ -233,7 +255,7 @@ public:
void logtrace(int level, const char* fmt, ...)
{
if (!log_enable()) {
if (!log_enable() || log_trace_msg == NULL) {
return;
}
if (fmt != log_trace_msg->data) {
@ -244,12 +266,13 @@ public:
(void)vsnprintf_s(log_trace_msg->data, log_trace_msg->maxlen, log_trace_msg->maxlen - 1, fmt, args);
va_end(args);
}
ereport(level, (errmsg("libpqsw:%s", log_trace_msg->data)));
ereport(level, (errmsg("libpqsw(%ld-%ld):%s", (uint64)this,
u_sess == NULL ? 0 : u_sess->session_id, log_trace_msg->data)));
}
virtual ~RedirectManager()
{
DestroyStringInfo(log_trace_msg);
Destroy();
}
public:
RedirectState state;