diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h index 8ffc51e748..26f8c78e88 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h @@ -162,6 +162,8 @@ union ObProxyCapabilityFlags bool is_weak_stale_feedback() const { return 1 == cap_flags_.OB_CAP_PROXY_WEAK_STALE_FEEDBACK; } bool is_flt_show_trace_support() const { return 1 == cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING_EXT && is_ob_protocol_v2_support(); } + bool is_session_sync_support() const { return 1 == cap_flags_.OB_CAP_PROXY_SESSIOIN_SYNC + && is_ob_protocol_v2_support(); } bool is_load_local_support() const { return 1 == cap_flags_.OB_CAP_LOCAL_FILES; } bool is_client_sessid_support() const { return 1 == cap_flags_.OB_CAP_PROXY_CLIENT_SESSION_ID; } diff --git a/src/observer/mysql/obmp_connect.cpp b/src/observer/mysql/obmp_connect.cpp index 43a24862a7..54bb9476a7 100644 --- a/src/observer/mysql/obmp_connect.cpp +++ b/src/observer/mysql/obmp_connect.cpp @@ -364,6 +364,7 @@ int ObMPConnect::process() // proxy mode & direct mode session->set_client_sessid_support(conn->proxy_cap_flags_.is_client_sessid_support() || (conn->proxy_sessid_ == 0)); + session->set_session_sync_support(conn->proxy_cap_flags_.is_session_sync_support()); session->get_control_info().support_show_trace_ = conn->proxy_cap_flags_.is_flt_show_trace_support(); LOG_TRACE("setup user resource group OK", "user_id", session->get_user_id(), diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index 2e3b1be73d..da276ca44d 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -604,6 +604,9 @@ OB_INLINE void ObResultSet::store_affected_rows(ObPhysicalPlanCtx &plan_ctx) affected_row = get_affected_rows(); } NG_TRACE_EXT(affected_rows, OB_ID(affected_rows), affected_row); + if (my_session_.is_session_sync_support()) { + my_session_.set_affected_rows_is_changed(affected_row); + } my_session_.set_affected_rows(affected_row); } diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 1962dbe095..22b552c0b2 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -200,7 +200,8 @@ ObSQLSessionInfo::ObSQLSessionInfo(const uint64_t tenant_id) : in_bytes_(0), out_bytes_(0), current_dblink_sequence_id_(0), - client_non_standard_(false) + client_non_standard_(false), + is_session_sync_support_(false) { MEMSET(tenant_buff_, 0, sizeof(share::ObTenantSpaceFetcher)); MEMSET(vip_buf_, 0, sizeof(vip_buf_)); @@ -384,6 +385,7 @@ void ObSQLSessionInfo::reset(bool skip_sys_var) MEMSET(vip_buf_, 0, sizeof(vip_buf_)); current_dblink_sequence_id_ = 0; dblink_sequence_schemas_.reset(); + is_session_sync_support_ = false; } void ObSQLSessionInfo::clean_status() @@ -1688,7 +1690,8 @@ OB_DEF_SERIALIZE(ObSQLSessionInfo) gtt_session_scope_unique_id_, gtt_trans_scope_unique_id_, gtt_session_scope_ids_, - gtt_trans_scope_ids_); + gtt_trans_scope_ids_, + affected_rows_); return ret; } @@ -1719,7 +1722,8 @@ OB_DEF_DESERIALIZE(ObSQLSessionInfo) gtt_session_scope_unique_id_, gtt_trans_scope_unique_id_, gtt_session_scope_ids_, - gtt_trans_scope_ids_); + gtt_trans_scope_ids_, + affected_rows_); (void)ObSQLUtils::adjust_time_by_ntp_offset(thread_data_.cur_query_start_time_); return ret; } @@ -1751,7 +1755,8 @@ OB_DEF_SERIALIZE_SIZE(ObSQLSessionInfo) gtt_session_scope_unique_id_, gtt_trans_scope_unique_id_, gtt_session_scope_ids_, - gtt_trans_scope_ids_); + gtt_trans_scope_ids_, + affected_rows_); return len; } @@ -3988,6 +3993,88 @@ int ObSequenceCurrvalEncoder::display_sess_info(ObSQLSessionInfo &sess, return ret; } +int ObQueryInfoEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t buf_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + OB_UNIS_ENCODE(sess.get_affected_rows()); + return ret; +} + +int ObQueryInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t data_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t affected_rows = 0; + int64_t found_rows = 0; + OB_UNIS_DECODE(affected_rows); + sess.set_affected_rows(affected_rows); + return ret; +} + +int ObQueryInfoEncoder::get_serialize_size(ObSQLSessionInfo &sess, int64_t &len) const +{ + int ret = OB_SUCCESS; + OB_UNIS_ADD_LEN(sess.get_affected_rows()); + return ret; +} + +int ObQueryInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(serialize(sess, buf, length, pos))) { + LOG_WARN("failed to fetch session info.", K(ret), K(pos), K(length)); + } + return ret; +} + +int64_t ObQueryInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess) +{ + int64_t len = 0; + get_serialize_size(sess, len); + return len; +} + +int ObQueryInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length, + const char* last_sess_buf, int64_t last_sess_length) +{ + int ret = OB_SUCCESS; + if (current_sess_length != last_sess_length) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to compare session info", K(ret), K(current_sess_length), K(last_sess_length), + KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length)); + } else if (memcmp(current_sess_buf, last_sess_buf, current_sess_length) == 0) { + LOG_TRACE("success to compare session info", K(ret)); + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to compare buf session info", K(ret), + KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length)); + } + return ret; +} + +int ObQueryInfoEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf, + int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) +{ + int ret = OB_SUCCESS; + UNUSED(current_sess_buf); + UNUSED(current_sess_length); + int64_t pos = 0; + const char *buf = last_sess_buf; + int64_t data_len = last_sess_length; + int64_t affected_rows = 0; + + LST_DO_CODE(OB_UNIS_DECODE, affected_rows); + if (sess.get_affected_rows() != affected_rows) { + share::ObTaskController::get().allow_next_syslog(); + LOG_WARN("failed to verify affected_rows", K(ret), + "current_affected_rows", sess.get_affected_rows(), + "last_affected_rows", affected_rows); + } else { + share::ObTaskController::get().allow_next_syslog(); + LOG_INFO("success to verify VariousInfo", K(ret)); + } + return ret; +} + OB_DEF_SERIALIZE(ObInnerContextMap) { int ret = OB_SUCCESS; diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index efb8a0c239..401aaca575 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -227,6 +227,7 @@ enum SessionSyncInfoType { SESSION_SYNC_TXN_EXTRA_INFO = 8, // 8: txn dynamic info SESSION_SYNC_SEQUENCE_CURRVAL = 9, // for sequence currval SESSION_SYNC_ERROR_SYS_VAR = 10, // for error scene need sync sysvar info + SESSION_SYNC_QUERY_INFO = 11, // for query level session info SESSION_SYNC_MAX_TYPE, }; @@ -394,6 +395,21 @@ public: int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override; }; +class ObQueryInfoEncoder : public ObSessInfoEncoder { +public: + ObQueryInfoEncoder() : ObSessInfoEncoder() {} + virtual ~ObQueryInfoEncoder() {} + virtual int serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override; + virtual int deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t length, int64_t &pos) override; + virtual int get_serialize_size(ObSQLSessionInfo &sess, int64_t &length) const override; + virtual int fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override; + virtual int64_t get_fetch_sess_info_size(ObSQLSessionInfo& sess) override; + virtual int compare_sess_info(const char* current_sess_buf, int64_t current_sess_length, + const char* last_sess_buf, int64_t last_sess_length) override; + virtual int display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf, + int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override; +}; + #define DEF_SESSION_TXN_ENCODER(CLS) \ class CLS final : public ObSessInfoEncoder { \ public: \ @@ -908,6 +924,8 @@ public: inline void set_ob20_protocol(bool is_20protocol) { is_ob20_protocol_ = is_20protocol; } inline bool is_ob20_protocol() { return is_ob20_protocol_; } + inline void set_session_sync_support(bool is_session_sync_support) { is_session_sync_support_ = is_session_sync_support; } + inline bool is_session_sync_support() { return is_session_sync_support_; } inline void set_session_var_sync(bool is_session_var_sync) { is_session_var_sync_ = is_session_var_sync; } @@ -1139,6 +1157,7 @@ public: ObControlInfoEncoder &get_control_info_encoder() { return control_info_encoder_;} ObErrorSyncSysVarEncoder &get_error_sync_sys_var_encoder() { return error_sync_sys_var_encoder_;} ObSequenceCurrvalEncoder &get_sequence_currval_encoder() { return sequence_currval_encoder_; } + ObQueryInfoEncoder &get_query_info_encoder() { return query_info_encoder_; } ObContextsMap &get_contexts_map() { return contexts_map_; } ObSequenceCurrvalMap &get_sequence_currval_map() { return sequence_currval_map_; } ObDBlinkSequenceIdMap &get_dblink_sequence_id_map() { return dblink_sequence_id_map_; } @@ -1302,6 +1321,7 @@ public: ObOptimizerTraceImpl& get_optimizer_tracer() { return optimizer_tracer_; } public: bool has_tx_level_temp_table() const { return tx_desc_ && tx_desc_->with_temporary_table(); } + void set_affected_rows_is_changed(int64_t affected_rows); private: int close_all_ps_stmt(); void destroy_contexts_map(ObContextsMap &map, common::ObIAllocator &alloc); @@ -1479,7 +1499,8 @@ private: &txn_participants_info_encoder_, &txn_extra_info_encoder_, &sequence_currval_encoder_, - &error_sync_sys_var_encoder_ + &error_sync_sys_var_encoder_, + &query_info_encoder_, }; ObSysVarEncoder sys_var_encoder_; //ObUserVarEncoder usr_var_encoder_; @@ -1493,6 +1514,7 @@ private: ObTxnExtraInfoEncoder txn_extra_info_encoder_; ObSequenceCurrvalEncoder sequence_currval_encoder_; ObErrorSyncSysVarEncoder error_sync_sys_var_encoder_; + ObQueryInfoEncoder query_info_encoder_; public: void post_sync_session_info(); void prep_txn_free_route_baseline(bool reset_audit = true); @@ -1542,6 +1564,7 @@ private: int64_t current_dblink_sequence_id_; common::ObSEArray dblink_sequence_schemas_; bool client_non_standard_; + bool is_session_sync_support_; // session_sync_support flag. }; inline bool ObSQLSessionInfo::is_terminate(int &ret) const @@ -1568,6 +1591,13 @@ inline bool ObSQLSessionInfo::is_terminate(int &ret) const return bret; } +inline void ObSQLSessionInfo::set_affected_rows_is_changed(int64_t affected_rows) +{ + if (affected_rows != get_affected_rows()) { + query_info_encoder_.is_changed_ = true; + } +} + } // namespace sql } // namespace oceanbase