Fix: affected row sync
This commit is contained in:
2
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
2
deps/oblib/src/rpc/obmysql/ob_mysql_packet.h
vendored
@ -162,6 +162,8 @@ union ObProxyCapabilityFlags
|
||||
bool is_weak_stale_feedback() const { return 1 == cap_flags_.OB_CAP_PROXY_WEAK_STALE_FEEDBACK; }
|
||||
bool is_flt_show_trace_support() const { return 1 == cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING_EXT
|
||||
&& is_ob_protocol_v2_support(); }
|
||||
bool is_session_sync_support() const { return 1 == cap_flags_.OB_CAP_PROXY_SESSIOIN_SYNC
|
||||
&& is_ob_protocol_v2_support(); }
|
||||
bool is_load_local_support() const { return 1 == cap_flags_.OB_CAP_LOCAL_FILES; }
|
||||
bool is_client_sessid_support() const { return 1 == cap_flags_.OB_CAP_PROXY_CLIENT_SESSION_ID; }
|
||||
|
||||
|
||||
@ -364,6 +364,7 @@ int ObMPConnect::process()
|
||||
// proxy mode & direct mode
|
||||
session->set_client_sessid_support(conn->proxy_cap_flags_.is_client_sessid_support()
|
||||
|| (conn->proxy_sessid_ == 0));
|
||||
session->set_session_sync_support(conn->proxy_cap_flags_.is_session_sync_support());
|
||||
session->get_control_info().support_show_trace_ = conn->proxy_cap_flags_.is_flt_show_trace_support();
|
||||
LOG_TRACE("setup user resource group OK",
|
||||
"user_id", session->get_user_id(),
|
||||
|
||||
@ -604,6 +604,9 @@ OB_INLINE void ObResultSet::store_affected_rows(ObPhysicalPlanCtx &plan_ctx)
|
||||
affected_row = get_affected_rows();
|
||||
}
|
||||
NG_TRACE_EXT(affected_rows, OB_ID(affected_rows), affected_row);
|
||||
if (my_session_.is_session_sync_support()) {
|
||||
my_session_.set_affected_rows_is_changed(affected_row);
|
||||
}
|
||||
my_session_.set_affected_rows(affected_row);
|
||||
}
|
||||
|
||||
|
||||
@ -200,7 +200,8 @@ ObSQLSessionInfo::ObSQLSessionInfo(const uint64_t tenant_id) :
|
||||
in_bytes_(0),
|
||||
out_bytes_(0),
|
||||
current_dblink_sequence_id_(0),
|
||||
client_non_standard_(false)
|
||||
client_non_standard_(false),
|
||||
is_session_sync_support_(false)
|
||||
{
|
||||
MEMSET(tenant_buff_, 0, sizeof(share::ObTenantSpaceFetcher));
|
||||
MEMSET(vip_buf_, 0, sizeof(vip_buf_));
|
||||
@ -384,6 +385,7 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
|
||||
MEMSET(vip_buf_, 0, sizeof(vip_buf_));
|
||||
current_dblink_sequence_id_ = 0;
|
||||
dblink_sequence_schemas_.reset();
|
||||
is_session_sync_support_ = false;
|
||||
}
|
||||
|
||||
void ObSQLSessionInfo::clean_status()
|
||||
@ -1688,7 +1690,8 @@ OB_DEF_SERIALIZE(ObSQLSessionInfo)
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
gtt_trans_scope_ids_,
|
||||
affected_rows_);
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1719,7 +1722,8 @@ OB_DEF_DESERIALIZE(ObSQLSessionInfo)
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
gtt_trans_scope_ids_,
|
||||
affected_rows_);
|
||||
(void)ObSQLUtils::adjust_time_by_ntp_offset(thread_data_.cur_query_start_time_);
|
||||
return ret;
|
||||
}
|
||||
@ -1751,7 +1755,8 @@ OB_DEF_SERIALIZE_SIZE(ObSQLSessionInfo)
|
||||
gtt_session_scope_unique_id_,
|
||||
gtt_trans_scope_unique_id_,
|
||||
gtt_session_scope_ids_,
|
||||
gtt_trans_scope_ids_);
|
||||
gtt_trans_scope_ids_,
|
||||
affected_rows_);
|
||||
return len;
|
||||
}
|
||||
|
||||
@ -3988,6 +3993,88 @@ int ObSequenceCurrvalEncoder::display_sess_info(ObSQLSessionInfo &sess,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::serialize(ObSQLSessionInfo &sess, char *buf, const int64_t buf_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
OB_UNIS_ENCODE(sess.get_affected_rows());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t data_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t affected_rows = 0;
|
||||
int64_t found_rows = 0;
|
||||
OB_UNIS_DECODE(affected_rows);
|
||||
sess.set_affected_rows(affected_rows);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::get_serialize_size(ObSQLSessionInfo &sess, int64_t &len) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
OB_UNIS_ADD_LEN(sess.get_affected_rows());
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(serialize(sess, buf, length, pos))) {
|
||||
LOG_WARN("failed to fetch session info.", K(ret), K(pos), K(length));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObQueryInfoEncoder::get_fetch_sess_info_size(ObSQLSessionInfo& sess)
|
||||
{
|
||||
int64_t len = 0;
|
||||
get_serialize_size(sess, len);
|
||||
return len;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
|
||||
const char* last_sess_buf, int64_t last_sess_length)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (current_sess_length != last_sess_length) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to compare session info", K(ret), K(current_sess_length), K(last_sess_length),
|
||||
KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
|
||||
} else if (memcmp(current_sess_buf, last_sess_buf, current_sess_length) == 0) {
|
||||
LOG_TRACE("success to compare session info", K(ret));
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to compare buf session info", K(ret),
|
||||
KPHEX(current_sess_buf, current_sess_length), KPHEX(last_sess_buf, last_sess_length));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObQueryInfoEncoder::display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
|
||||
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
UNUSED(current_sess_buf);
|
||||
UNUSED(current_sess_length);
|
||||
int64_t pos = 0;
|
||||
const char *buf = last_sess_buf;
|
||||
int64_t data_len = last_sess_length;
|
||||
int64_t affected_rows = 0;
|
||||
|
||||
LST_DO_CODE(OB_UNIS_DECODE, affected_rows);
|
||||
if (sess.get_affected_rows() != affected_rows) {
|
||||
share::ObTaskController::get().allow_next_syslog();
|
||||
LOG_WARN("failed to verify affected_rows", K(ret),
|
||||
"current_affected_rows", sess.get_affected_rows(),
|
||||
"last_affected_rows", affected_rows);
|
||||
} else {
|
||||
share::ObTaskController::get().allow_next_syslog();
|
||||
LOG_INFO("success to verify VariousInfo", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE(ObInnerContextMap)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -227,6 +227,7 @@ enum SessionSyncInfoType {
|
||||
SESSION_SYNC_TXN_EXTRA_INFO = 8, // 8: txn dynamic info
|
||||
SESSION_SYNC_SEQUENCE_CURRVAL = 9, // for sequence currval
|
||||
SESSION_SYNC_ERROR_SYS_VAR = 10, // for error scene need sync sysvar info
|
||||
SESSION_SYNC_QUERY_INFO = 11, // for query level session info
|
||||
SESSION_SYNC_MAX_TYPE,
|
||||
};
|
||||
|
||||
@ -394,6 +395,21 @@ public:
|
||||
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
|
||||
};
|
||||
|
||||
class ObQueryInfoEncoder : public ObSessInfoEncoder {
|
||||
public:
|
||||
ObQueryInfoEncoder() : ObSessInfoEncoder() {}
|
||||
virtual ~ObQueryInfoEncoder() {}
|
||||
virtual int serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
|
||||
virtual int deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t length, int64_t &pos) override;
|
||||
virtual int get_serialize_size(ObSQLSessionInfo &sess, int64_t &length) const override;
|
||||
virtual int fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
|
||||
virtual int64_t get_fetch_sess_info_size(ObSQLSessionInfo& sess) override;
|
||||
virtual int compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
|
||||
const char* last_sess_buf, int64_t last_sess_length) override;
|
||||
virtual int display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
|
||||
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
|
||||
};
|
||||
|
||||
#define DEF_SESSION_TXN_ENCODER(CLS) \
|
||||
class CLS final : public ObSessInfoEncoder { \
|
||||
public: \
|
||||
@ -908,6 +924,8 @@ public:
|
||||
|
||||
inline void set_ob20_protocol(bool is_20protocol) { is_ob20_protocol_ = is_20protocol; }
|
||||
inline bool is_ob20_protocol() { return is_ob20_protocol_; }
|
||||
inline void set_session_sync_support(bool is_session_sync_support) { is_session_sync_support_ = is_session_sync_support; }
|
||||
inline bool is_session_sync_support() { return is_session_sync_support_; }
|
||||
|
||||
inline void set_session_var_sync(bool is_session_var_sync)
|
||||
{ is_session_var_sync_ = is_session_var_sync; }
|
||||
@ -1139,6 +1157,7 @@ public:
|
||||
ObControlInfoEncoder &get_control_info_encoder() { return control_info_encoder_;}
|
||||
ObErrorSyncSysVarEncoder &get_error_sync_sys_var_encoder() { return error_sync_sys_var_encoder_;}
|
||||
ObSequenceCurrvalEncoder &get_sequence_currval_encoder() { return sequence_currval_encoder_; }
|
||||
ObQueryInfoEncoder &get_query_info_encoder() { return query_info_encoder_; }
|
||||
ObContextsMap &get_contexts_map() { return contexts_map_; }
|
||||
ObSequenceCurrvalMap &get_sequence_currval_map() { return sequence_currval_map_; }
|
||||
ObDBlinkSequenceIdMap &get_dblink_sequence_id_map() { return dblink_sequence_id_map_; }
|
||||
@ -1302,6 +1321,7 @@ public:
|
||||
ObOptimizerTraceImpl& get_optimizer_tracer() { return optimizer_tracer_; }
|
||||
public:
|
||||
bool has_tx_level_temp_table() const { return tx_desc_ && tx_desc_->with_temporary_table(); }
|
||||
void set_affected_rows_is_changed(int64_t affected_rows);
|
||||
private:
|
||||
int close_all_ps_stmt();
|
||||
void destroy_contexts_map(ObContextsMap &map, common::ObIAllocator &alloc);
|
||||
@ -1479,7 +1499,8 @@ private:
|
||||
&txn_participants_info_encoder_,
|
||||
&txn_extra_info_encoder_,
|
||||
&sequence_currval_encoder_,
|
||||
&error_sync_sys_var_encoder_
|
||||
&error_sync_sys_var_encoder_,
|
||||
&query_info_encoder_,
|
||||
};
|
||||
ObSysVarEncoder sys_var_encoder_;
|
||||
//ObUserVarEncoder usr_var_encoder_;
|
||||
@ -1493,6 +1514,7 @@ private:
|
||||
ObTxnExtraInfoEncoder txn_extra_info_encoder_;
|
||||
ObSequenceCurrvalEncoder sequence_currval_encoder_;
|
||||
ObErrorSyncSysVarEncoder error_sync_sys_var_encoder_;
|
||||
ObQueryInfoEncoder query_info_encoder_;
|
||||
public:
|
||||
void post_sync_session_info();
|
||||
void prep_txn_free_route_baseline(bool reset_audit = true);
|
||||
@ -1542,6 +1564,7 @@ private:
|
||||
int64_t current_dblink_sequence_id_;
|
||||
common::ObSEArray<ObSequenceSchema*, 2> dblink_sequence_schemas_;
|
||||
bool client_non_standard_;
|
||||
bool is_session_sync_support_; // session_sync_support flag.
|
||||
};
|
||||
|
||||
inline bool ObSQLSessionInfo::is_terminate(int &ret) const
|
||||
@ -1568,6 +1591,13 @@ inline bool ObSQLSessionInfo::is_terminate(int &ret) const
|
||||
return bret;
|
||||
}
|
||||
|
||||
inline void ObSQLSessionInfo::set_affected_rows_is_changed(int64_t affected_rows)
|
||||
{
|
||||
if (affected_rows != get_affected_rows()) {
|
||||
query_info_encoder_.is_changed_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace sql
|
||||
} // namespace oceanbase
|
||||
|
||||
|
||||
Reference in New Issue
Block a user