Add cap for session variable synchronization for proxy version compatibility
This commit is contained in:
parent
8fab2d62ff
commit
48554a6c5f
5
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
5
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
@ -156,6 +156,8 @@ union ObProxyCapabilityFlags
|
||||
&& is_ob_protocol_v2_support(); }
|
||||
bool is_new_extra_info_support() const { return 1 == cap_flags_.OB_CAP_PROXY_NEW_EXTRA_INFO
|
||||
&& is_ob_protocol_v2_support(); }
|
||||
bool is_session_var_sync_support() const { return 1 == cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC
|
||||
&& is_ob_protocol_v2_support(); }
|
||||
|
||||
uint64_t capability_;
|
||||
struct CapabilityFlags
|
||||
@ -184,7 +186,8 @@ union ObProxyCapabilityFlags
|
||||
// for full trace_route
|
||||
uint64_t OB_CAP_PROXY_FULL_LINK_TRACING: 1;
|
||||
uint64_t OB_CAP_PROXY_NEW_EXTRA_INFO: 1;
|
||||
uint64_t OB_CAP_RESERVED_NOT_USE: 48;
|
||||
uint64_t OB_CAP_PROXY_SESSION_VAR_SYNC: 1;
|
||||
uint64_t OB_CAP_RESERVED_NOT_USE: 47;
|
||||
} cap_flags_;
|
||||
};
|
||||
|
||||
|
@ -265,6 +265,8 @@ int ObMPConnect::process()
|
||||
session->set_ob20_protocol(conn->proxy_cap_flags_.is_ob_protocol_v2_support());
|
||||
// set sql request level to session, to avoid sql request dead lock between OB cluster (eg. dblink)
|
||||
session->set_sql_request_level(conn->sql_req_level_);
|
||||
// set session var sync info.
|
||||
session->set_session_var_sync(conn->proxy_cap_flags_.is_session_var_sync_support());
|
||||
|
||||
LOG_TRACE("setup user resource group OK",
|
||||
"user_id", session->get_user_id(),
|
||||
@ -1523,6 +1525,7 @@ int ObMPConnect::check_update_proxy_capability(ObSMConnection &conn) const
|
||||
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSIOIN_SYNC = 1;
|
||||
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING = 1;
|
||||
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_NEW_EXTRA_INFO = 1;
|
||||
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 1;
|
||||
conn.proxy_cap_flags_.capability_ = (server_proxy_cap_flag.capability_ & client_proxy_cap);//if old java client, set it 0
|
||||
|
||||
LOG_DEBUG("Negotiated capability",
|
||||
|
@ -46,11 +46,13 @@ int ObMPUtils::add_changed_session_info(OMPKOK &ok_pkt, sql::ObSQLSessionInfo &s
|
||||
LOG_DEBUG("sys var changed", K(session.get_tenant_name()), K(sys_var.count()));
|
||||
// if sys_var change, set SESSION_SYNC_SYS_VAR type's encoder->is_changed_ = true
|
||||
// for turn on serialize sys delta vars.
|
||||
ObSessInfoEncoder* encoder = NULL;
|
||||
if (OB_FAIL(session.get_sess_encoder(SESSION_SYNC_SYS_VAR, encoder))) {
|
||||
LOG_WARN("failed to get session encoder", K(ret));
|
||||
} else {
|
||||
encoder->is_changed_ = true;
|
||||
if (session.is_session_var_sync()) {
|
||||
ObSessInfoEncoder* encoder = NULL;
|
||||
if (OB_FAIL(session.get_sess_encoder(SESSION_SYNC_SYS_VAR, encoder))) {
|
||||
LOG_WARN("failed to get session encoder", K(ret));
|
||||
} else {
|
||||
encoder->is_changed_ = true;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < sys_var.count(); ++i) {
|
||||
sql::ObBasicSessionInfo::ChangedVar change_var = sys_var.at(i);
|
||||
|
@ -150,6 +150,7 @@ ObSQLSessionInfo::ObSQLSessionInfo() :
|
||||
pl_query_sender_(NULL),
|
||||
pl_ps_protocol_(false),
|
||||
is_ob20_protocol_(false),
|
||||
is_session_var_sync_(false),
|
||||
last_plan_id_(0),
|
||||
pl_sync_pkg_vars_(NULL),
|
||||
inner_conn_(NULL),
|
||||
@ -340,6 +341,7 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
|
||||
auto_flush_trace_ = false;
|
||||
coninfo_set_by_sess_ = false;
|
||||
is_ob20_protocol_ = false;
|
||||
is_session_var_sync_ = false;
|
||||
int temp_ret = OB_SUCCESS;
|
||||
dblink_context_.reset();
|
||||
sql_req_level_ = 0;
|
||||
|
@ -701,6 +701,10 @@ public:
|
||||
inline void set_ob20_protocol(bool is_20protocol) { is_ob20_protocol_ = is_20protocol; }
|
||||
inline bool is_ob20_protocol() { return is_ob20_protocol_; }
|
||||
|
||||
inline void set_session_var_sync(bool is_session_var_sync)
|
||||
{ is_session_var_sync_ = is_session_var_sync; }
|
||||
inline bool is_session_var_sync() { return is_session_var_sync_; }
|
||||
|
||||
int replace_user_variable(const common::ObString &name, const ObSessionVariable &value);
|
||||
int replace_user_variable(
|
||||
ObExecContext &ctx, const common::ObString &name, const ObSessionVariable &value);
|
||||
@ -1104,6 +1108,8 @@ private:
|
||||
bool pl_ps_protocol_; // send query result use this protocol
|
||||
bool is_ob20_protocol_; // mark as whether use oceanbase 2.0 protocol
|
||||
|
||||
bool is_session_var_sync_; //session var sync support flag.
|
||||
|
||||
int64_t last_plan_id_; // 记录上一个计划的 plan_id,用于 show trace 中显示 sql 物理计划
|
||||
|
||||
common::hash::ObHashSet<common::ObString> *pl_sync_pkg_vars_ = NULL;
|
||||
|
Loading…
x
Reference in New Issue
Block a user