Fix: affected row sync

This commit is contained in:
yaojing624
2023-12-26 02:13:24 +00:00
committed by ob-robot
parent c395a8bc73
commit 4aacc8a7e6
5 changed files with 128 additions and 5 deletions

View File

@ -227,6 +227,7 @@ enum SessionSyncInfoType {
SESSION_SYNC_TXN_EXTRA_INFO = 8, // 8: txn dynamic info
SESSION_SYNC_SEQUENCE_CURRVAL = 9, // for sequence currval
SESSION_SYNC_ERROR_SYS_VAR = 10, // for error scene need sync sysvar info
SESSION_SYNC_QUERY_INFO = 11, // for query level session info
SESSION_SYNC_MAX_TYPE,
};
@ -394,6 +395,21 @@ public:
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
};
class ObQueryInfoEncoder : public ObSessInfoEncoder {
public:
ObQueryInfoEncoder() : ObSessInfoEncoder() {}
virtual ~ObQueryInfoEncoder() {}
virtual int serialize(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
virtual int deserialize(ObSQLSessionInfo &sess, const char *buf, const int64_t length, int64_t &pos) override;
virtual int get_serialize_size(ObSQLSessionInfo &sess, int64_t &length) const override;
virtual int fetch_sess_info(ObSQLSessionInfo &sess, char *buf, const int64_t length, int64_t &pos) override;
virtual int64_t get_fetch_sess_info_size(ObSQLSessionInfo& sess) override;
virtual int compare_sess_info(const char* current_sess_buf, int64_t current_sess_length,
const char* last_sess_buf, int64_t last_sess_length) override;
virtual int display_sess_info(ObSQLSessionInfo &sess, const char* current_sess_buf,
int64_t current_sess_length, const char* last_sess_buf, int64_t last_sess_length) override;
};
#define DEF_SESSION_TXN_ENCODER(CLS) \
class CLS final : public ObSessInfoEncoder { \
public: \
@ -908,6 +924,8 @@ public:
inline void set_ob20_protocol(bool is_20protocol) { is_ob20_protocol_ = is_20protocol; }
inline bool is_ob20_protocol() { return is_ob20_protocol_; }
inline void set_session_sync_support(bool is_session_sync_support) { is_session_sync_support_ = is_session_sync_support; }
inline bool is_session_sync_support() { return is_session_sync_support_; }
inline void set_session_var_sync(bool is_session_var_sync)
{ is_session_var_sync_ = is_session_var_sync; }
@ -1139,6 +1157,7 @@ public:
ObControlInfoEncoder &get_control_info_encoder() { return control_info_encoder_;}
ObErrorSyncSysVarEncoder &get_error_sync_sys_var_encoder() { return error_sync_sys_var_encoder_;}
ObSequenceCurrvalEncoder &get_sequence_currval_encoder() { return sequence_currval_encoder_; }
ObQueryInfoEncoder &get_query_info_encoder() { return query_info_encoder_; }
ObContextsMap &get_contexts_map() { return contexts_map_; }
ObSequenceCurrvalMap &get_sequence_currval_map() { return sequence_currval_map_; }
ObDBlinkSequenceIdMap &get_dblink_sequence_id_map() { return dblink_sequence_id_map_; }
@ -1302,6 +1321,7 @@ public:
ObOptimizerTraceImpl& get_optimizer_tracer() { return optimizer_tracer_; }
public:
bool has_tx_level_temp_table() const { return tx_desc_ && tx_desc_->with_temporary_table(); }
void set_affected_rows_is_changed(int64_t affected_rows);
private:
int close_all_ps_stmt();
void destroy_contexts_map(ObContextsMap &map, common::ObIAllocator &alloc);
@ -1479,7 +1499,8 @@ private:
&txn_participants_info_encoder_,
&txn_extra_info_encoder_,
&sequence_currval_encoder_,
&error_sync_sys_var_encoder_
&error_sync_sys_var_encoder_,
&query_info_encoder_,
};
ObSysVarEncoder sys_var_encoder_;
//ObUserVarEncoder usr_var_encoder_;
@ -1493,6 +1514,7 @@ private:
ObTxnExtraInfoEncoder txn_extra_info_encoder_;
ObSequenceCurrvalEncoder sequence_currval_encoder_;
ObErrorSyncSysVarEncoder error_sync_sys_var_encoder_;
ObQueryInfoEncoder query_info_encoder_;
public:
void post_sync_session_info();
void prep_txn_free_route_baseline(bool reset_audit = true);
@ -1542,6 +1564,7 @@ private:
int64_t current_dblink_sequence_id_;
common::ObSEArray<ObSequenceSchema*, 2> dblink_sequence_schemas_;
bool client_non_standard_;
bool is_session_sync_support_; // session_sync_support flag.
};
inline bool ObSQLSessionInfo::is_terminate(int &ret) const
@ -1568,6 +1591,13 @@ inline bool ObSQLSessionInfo::is_terminate(int &ret) const
return bret;
}
inline void ObSQLSessionInfo::set_affected_rows_is_changed(int64_t affected_rows)
{
if (affected_rows != get_affected_rows()) {
query_info_encoder_.is_changed_ = true;
}
}
} // namespace sql
} // namespace oceanbase