diff --git a/src/observer/mysql/ob_mysql_end_trans_cb.cpp b/src/observer/mysql/ob_mysql_end_trans_cb.cpp index dfe78cbdc0..1d449cceda 100644 --- a/src/observer/mysql/ob_mysql_end_trans_cb.cpp +++ b/src/observer/mysql/ob_mysql_end_trans_cb.cpp @@ -60,12 +60,13 @@ int ObSqlEndTransCb::set_packet_param(const sql::ObEndTransCbPacketParam &pkt_pa int ObSqlEndTransCb::init(ObMPPacketSender& packet_sender, sql::ObSQLSessionInfo *sess_info, int32_t stmt_id, - uint64_t params_num) + uint64_t params_num, + int64_t com_offset) { sess_info_ = sess_info; stmt_id_ = stmt_id; params_num_ = params_num; - return packet_sender_.clone_from(packet_sender); + return packet_sender_.clone_from(packet_sender, com_offset); } void ObSqlEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id) diff --git a/src/observer/mysql/ob_mysql_end_trans_cb.h b/src/observer/mysql/ob_mysql_end_trans_cb.h index 6e0094c238..60e6e4440c 100644 --- a/src/observer/mysql/ob_mysql_end_trans_cb.h +++ b/src/observer/mysql/ob_mysql_end_trans_cb.h @@ -54,7 +54,8 @@ public: int init(ObMPPacketSender& packet_sender, sql::ObSQLSessionInfo *sess_info, int32_t stmt_id = 0, - uint64_t params_num = 0); + uint64_t params_num = 0, + int64_t com_offset = 0); int set_packet_param(const sql::ObEndTransCbPacketParam &pkt_param); void destroy(); void reset(); diff --git a/src/observer/mysql/obmp_packet_sender.cpp b/src/observer/mysql/obmp_packet_sender.cpp index aa49e39d4f..2c2c33948d 100644 --- a/src/observer/mysql/obmp_packet_sender.cpp +++ b/src/observer/mysql/obmp_packet_sender.cpp @@ -110,15 +110,16 @@ int ObMPPacketSender::init(rpc::ObRequest *req) bool is_conn_valid = true; bool req_has_wokenup = false; int64_t receive_ts = req->get_receive_timestamp(); - ret = do_init(req, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts); + ret = do_init(req, pkt_seq + 1, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts); } return ret; } -int ObMPPacketSender::clone_from(ObMPPacketSender& that) +int ObMPPacketSender::clone_from(ObMPPacketSender& that, int64_t com_offset) { int ret = OB_SUCCESS; - if (OB_FAIL(do_init(that.req_, that.seq_, that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) { + if (OB_FAIL(do_init(that.req_, that.seq_, that.comp_context_.seq_ + com_offset, + that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) { SERVER_LOG(ERROR, "clone packet sender fail", K(ret)); } else { comp_context_.is_checksum_off_ = that.comp_context_.is_checksum_off_; @@ -129,6 +130,7 @@ int ObMPPacketSender::clone_from(ObMPPacketSender& that) int ObMPPacketSender::do_init(rpc::ObRequest *req, uint8_t packet_seq, + uint8_t comp_seq, bool conn_status, bool req_has_wokenup, int64_t query_receive_ts) @@ -152,7 +154,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req, // init comp_context comp_context_.reset(); comp_context_.type_ = conn->get_compress_type(); - comp_context_.seq_ = seq_; + comp_context_.seq_ = comp_seq; comp_context_.sessid_ = sessid_; comp_context_.conn_ = conn; @@ -161,7 +163,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req, if (is_proto20_supported) { proto20_context_.reset(); proto20_context_.is_proto20_used_ = is_proto20_supported; - proto20_context_.comp_seq_ = seq_; + proto20_context_.comp_seq_ = comp_seq; proto20_context_.request_id_ = conn->proto20_pkt_context_.proto20_last_request_id_; proto20_context_.proto20_seq_ = static_cast(conn->proto20_pkt_context_.proto20_last_pkt_seq_ + 1); proto20_context_.header_len_ = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE; diff --git a/src/observer/mysql/obmp_packet_sender.h b/src/observer/mysql/obmp_packet_sender.h index 8948a18dad..9c16fee1b0 100644 --- a/src/observer/mysql/obmp_packet_sender.h +++ b/src/observer/mysql/obmp_packet_sender.h @@ -150,10 +150,11 @@ public: virtual bool need_send_extra_ok_packet() override { return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet(); } virtual int flush_buffer(const bool is_last); - int clone_from(ObMPPacketSender& that); + int clone_from(ObMPPacketSender& that, int64_t com_offset = 0/*for prexecute it will be 1*/); int init(rpc::ObRequest* req); int do_init(rpc::ObRequest *req, uint8_t packet_seq, + uint8_t comp_seq, bool conn_status, bool req_has_wokenup, int64_t query_receive_ts); @@ -169,6 +170,7 @@ public: int clean_buffer(); bool has_pl(); int alloc_ezbuf(); + int64_t get_comp_seq() { return comp_context_.seq_; } private: int init_read_handle(); diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index 6a77cdde44..951ab865ba 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -1446,7 +1446,8 @@ int ObMPStmtExecute::response_result( // NOTE: sql_end_cb必须在drv.response_result()之前初始化好 ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb(); if (OB_FAIL(sql_end_cb.init(packet_sender_, &session, - stmt_id_, params_num_))) { + stmt_id_, params_num_, + is_prexecute() ? packet_sender_.get_comp_seq() : 0))) { LOG_WARN("failed to init sql end callback", K(ret)); } else if (OB_FAIL(drv.response_result(result))) { LOG_WARN("fail response async result", K(ret)); @@ -1472,7 +1473,8 @@ int ObMPStmtExecute::response_result( ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb(); ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute()); if (OB_FAIL(sql_end_cb.init(packet_sender_, &session, - stmt_id_, params_num_))) { + stmt_id_, params_num_, + is_prexecute() ? packet_sender_.get_comp_seq() : 0))) { LOG_WARN("failed to init sql end callback", K(ret)); } else if (OB_FAIL(drv.response_result(result))) { LOG_WARN("fail response async result", K(ret));