[CP] dml comp protocol use wrong seq

This commit is contained in:
LiuYoung00
2024-02-07 20:34:17 +00:00
committed by ob-robot
parent c2511b580f
commit b88eb76b6d
5 changed files with 19 additions and 11 deletions

View File

@ -60,12 +60,13 @@ int ObSqlEndTransCb::set_packet_param(const sql::ObEndTransCbPacketParam &pkt_pa
int ObSqlEndTransCb::init(ObMPPacketSender& packet_sender, int ObSqlEndTransCb::init(ObMPPacketSender& packet_sender,
sql::ObSQLSessionInfo *sess_info, sql::ObSQLSessionInfo *sess_info,
int32_t stmt_id, int32_t stmt_id,
uint64_t params_num) uint64_t params_num,
int64_t com_offset)
{ {
sess_info_ = sess_info; sess_info_ = sess_info;
stmt_id_ = stmt_id; stmt_id_ = stmt_id;
params_num_ = params_num; params_num_ = params_num;
return packet_sender_.clone_from(packet_sender); return packet_sender_.clone_from(packet_sender, com_offset);
} }
void ObSqlEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id) void ObSqlEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id)

View File

@ -54,7 +54,8 @@ public:
int init(ObMPPacketSender& packet_sender, int init(ObMPPacketSender& packet_sender,
sql::ObSQLSessionInfo *sess_info, sql::ObSQLSessionInfo *sess_info,
int32_t stmt_id = 0, int32_t stmt_id = 0,
uint64_t params_num = 0); uint64_t params_num = 0,
int64_t com_offset = 0);
int set_packet_param(const sql::ObEndTransCbPacketParam &pkt_param); int set_packet_param(const sql::ObEndTransCbPacketParam &pkt_param);
void destroy(); void destroy();
void reset(); void reset();

View File

@ -110,15 +110,16 @@ int ObMPPacketSender::init(rpc::ObRequest *req)
bool is_conn_valid = true; bool is_conn_valid = true;
bool req_has_wokenup = false; bool req_has_wokenup = false;
int64_t receive_ts = req->get_receive_timestamp(); int64_t receive_ts = req->get_receive_timestamp();
ret = do_init(req, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts); ret = do_init(req, pkt_seq + 1, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts);
} }
return ret; return ret;
} }
int ObMPPacketSender::clone_from(ObMPPacketSender& that) int ObMPPacketSender::clone_from(ObMPPacketSender& that, int64_t com_offset)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_FAIL(do_init(that.req_, that.seq_, that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) { if (OB_FAIL(do_init(that.req_, that.seq_, that.comp_context_.seq_ + com_offset,
that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) {
SERVER_LOG(ERROR, "clone packet sender fail", K(ret)); SERVER_LOG(ERROR, "clone packet sender fail", K(ret));
} else { } else {
comp_context_.is_checksum_off_ = that.comp_context_.is_checksum_off_; comp_context_.is_checksum_off_ = that.comp_context_.is_checksum_off_;
@ -129,6 +130,7 @@ int ObMPPacketSender::clone_from(ObMPPacketSender& that)
int ObMPPacketSender::do_init(rpc::ObRequest *req, int ObMPPacketSender::do_init(rpc::ObRequest *req,
uint8_t packet_seq, uint8_t packet_seq,
uint8_t comp_seq,
bool conn_status, bool conn_status,
bool req_has_wokenup, bool req_has_wokenup,
int64_t query_receive_ts) int64_t query_receive_ts)
@ -152,7 +154,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req,
// init comp_context // init comp_context
comp_context_.reset(); comp_context_.reset();
comp_context_.type_ = conn->get_compress_type(); comp_context_.type_ = conn->get_compress_type();
comp_context_.seq_ = seq_; comp_context_.seq_ = comp_seq;
comp_context_.sessid_ = sessid_; comp_context_.sessid_ = sessid_;
comp_context_.conn_ = conn; comp_context_.conn_ = conn;
@ -161,7 +163,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req,
if (is_proto20_supported) { if (is_proto20_supported) {
proto20_context_.reset(); proto20_context_.reset();
proto20_context_.is_proto20_used_ = is_proto20_supported; proto20_context_.is_proto20_used_ = is_proto20_supported;
proto20_context_.comp_seq_ = seq_; proto20_context_.comp_seq_ = comp_seq;
proto20_context_.request_id_ = conn->proto20_pkt_context_.proto20_last_request_id_; proto20_context_.request_id_ = conn->proto20_pkt_context_.proto20_last_request_id_;
proto20_context_.proto20_seq_ = static_cast<uint8_t>(conn->proto20_pkt_context_.proto20_last_pkt_seq_ + 1); proto20_context_.proto20_seq_ = static_cast<uint8_t>(conn->proto20_pkt_context_.proto20_last_pkt_seq_ + 1);
proto20_context_.header_len_ = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE; proto20_context_.header_len_ = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE;

View File

@ -150,10 +150,11 @@ public:
virtual bool need_send_extra_ok_packet() override virtual bool need_send_extra_ok_packet() override
{ return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet(); } { return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet(); }
virtual int flush_buffer(const bool is_last); virtual int flush_buffer(const bool is_last);
int clone_from(ObMPPacketSender& that); int clone_from(ObMPPacketSender& that, int64_t com_offset = 0/*for prexecute it will be 1*/);
int init(rpc::ObRequest* req); int init(rpc::ObRequest* req);
int do_init(rpc::ObRequest *req, int do_init(rpc::ObRequest *req,
uint8_t packet_seq, uint8_t packet_seq,
uint8_t comp_seq,
bool conn_status, bool conn_status,
bool req_has_wokenup, bool req_has_wokenup,
int64_t query_receive_ts); int64_t query_receive_ts);
@ -169,6 +170,7 @@ public:
int clean_buffer(); int clean_buffer();
bool has_pl(); bool has_pl();
int alloc_ezbuf(); int alloc_ezbuf();
int64_t get_comp_seq() { return comp_context_.seq_; }
private: private:
int init_read_handle(); int init_read_handle();

View File

@ -1446,7 +1446,8 @@ int ObMPStmtExecute::response_result(
// NOTE: sql_end_cb必须在drv.response_result()之前初始化好 // NOTE: sql_end_cb必须在drv.response_result()之前初始化好
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb(); ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session, if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
stmt_id_, params_num_))) { stmt_id_, params_num_,
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
LOG_WARN("failed to init sql end callback", K(ret)); LOG_WARN("failed to init sql end callback", K(ret));
} else if (OB_FAIL(drv.response_result(result))) { } else if (OB_FAIL(drv.response_result(result))) {
LOG_WARN("fail response async result", K(ret)); LOG_WARN("fail response async result", K(ret));
@ -1472,7 +1473,8 @@ int ObMPStmtExecute::response_result(
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb(); ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute()); ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session, if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
stmt_id_, params_num_))) { stmt_id_, params_num_,
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
LOG_WARN("failed to init sql end callback", K(ret)); LOG_WARN("failed to init sql end callback", K(ret));
} else if (OB_FAIL(drv.response_result(result))) { } else if (OB_FAIL(drv.response_result(result))) {
LOG_WARN("fail response async result", K(ret)); LOG_WARN("fail response async result", K(ret));