[CP] dml comp protocol use wrong seq

This commit is contained in:
LiuYoung00
2023-12-14 08:13:10 +00:00
committed by ob-robot
parent f7cd32be94
commit 38d4eb7ccc
5 changed files with 19 additions and 11 deletions

View File

@ -60,12 +60,13 @@ int ObSqlEndTransCb::set_packet_param(const sql::ObEndTransCbPacketParam &pkt_pa
int ObSqlEndTransCb::init(ObMPPacketSender& packet_sender,
sql::ObSQLSessionInfo *sess_info,
int32_t stmt_id,
uint64_t params_num)
uint64_t params_num,
int64_t com_offset)
{
sess_info_ = sess_info;
stmt_id_ = stmt_id;
params_num_ = params_num;
return packet_sender_.clone_from(packet_sender);
return packet_sender_.clone_from(packet_sender, com_offset);
}
void ObSqlEndTransCb::callback(int cb_param, const transaction::ObTransID &trans_id)

View File

@ -54,7 +54,8 @@ public:
int init(ObMPPacketSender& packet_sender,
sql::ObSQLSessionInfo *sess_info,
int32_t stmt_id = 0,
uint64_t params_num = 0);
uint64_t params_num = 0,
int64_t com_offset = 0);
int set_packet_param(const sql::ObEndTransCbPacketParam &pkt_param);
void destroy();
void reset();

View File

@ -110,15 +110,16 @@ int ObMPPacketSender::init(rpc::ObRequest *req)
bool is_conn_valid = true;
bool req_has_wokenup = false;
int64_t receive_ts = req->get_receive_timestamp();
ret = do_init(req, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts);
ret = do_init(req, pkt_seq + 1, pkt_seq + 1, is_conn_valid, req_has_wokenup, receive_ts);
}
return ret;
}
int ObMPPacketSender::clone_from(ObMPPacketSender& that)
int ObMPPacketSender::clone_from(ObMPPacketSender& that, int64_t com_offset)
{
int ret = OB_SUCCESS;
if (OB_FAIL(do_init(that.req_, that.seq_, that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) {
if (OB_FAIL(do_init(that.req_, that.seq_, that.comp_context_.seq_ + com_offset,
that.conn_valid_, that.req_has_wokenup_, that.query_receive_ts_))) {
SERVER_LOG(ERROR, "clone packet sender fail", K(ret));
} else {
comp_context_.is_checksum_off_ = that.comp_context_.is_checksum_off_;
@ -129,6 +130,7 @@ int ObMPPacketSender::clone_from(ObMPPacketSender& that)
int ObMPPacketSender::do_init(rpc::ObRequest *req,
uint8_t packet_seq,
uint8_t comp_seq,
bool conn_status,
bool req_has_wokenup,
int64_t query_receive_ts)
@ -152,7 +154,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req,
// init comp_context
comp_context_.reset();
comp_context_.type_ = conn->get_compress_type();
comp_context_.seq_ = seq_;
comp_context_.seq_ = comp_seq;
comp_context_.sessid_ = sessid_;
comp_context_.conn_ = conn;
@ -161,7 +163,7 @@ int ObMPPacketSender::do_init(rpc::ObRequest *req,
if (is_proto20_supported) {
proto20_context_.reset();
proto20_context_.is_proto20_used_ = is_proto20_supported;
proto20_context_.comp_seq_ = seq_;
proto20_context_.comp_seq_ = comp_seq;
proto20_context_.request_id_ = conn->proto20_pkt_context_.proto20_last_request_id_;
proto20_context_.proto20_seq_ = static_cast<uint8_t>(conn->proto20_pkt_context_.proto20_last_pkt_seq_ + 1);
proto20_context_.header_len_ = OB20_PROTOCOL_HEADER_LENGTH + OB_MYSQL_COMPRESSED_HEADER_SIZE;

View File

@ -150,10 +150,11 @@ public:
virtual bool need_send_extra_ok_packet() override
{ return OB_NOT_NULL(get_conn()) && get_conn()->need_send_extra_ok_packet(); }
virtual int flush_buffer(const bool is_last);
int clone_from(ObMPPacketSender& that);
int clone_from(ObMPPacketSender& that, int64_t com_offset = 0/*for prexecute it will be 1*/);
int init(rpc::ObRequest* req);
int do_init(rpc::ObRequest *req,
uint8_t packet_seq,
uint8_t comp_seq,
bool conn_status,
bool req_has_wokenup,
int64_t query_receive_ts);
@ -169,6 +170,7 @@ public:
int clean_buffer();
bool has_pl();
int alloc_ezbuf();
int64_t get_comp_seq() { return comp_context_.seq_; }
private:
int init_read_handle();

View File

@ -1446,7 +1446,8 @@ int ObMPStmtExecute::response_result(
// NOTE: sql_end_cb必须在drv.response_result()之前初始化好
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
stmt_id_, params_num_))) {
stmt_id_, params_num_,
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
LOG_WARN("failed to init sql end callback", K(ret));
} else if (OB_FAIL(drv.response_result(result))) {
LOG_WARN("fail response async result", K(ret));
@ -1472,7 +1473,8 @@ int ObMPStmtExecute::response_result(
ObSqlEndTransCb &sql_end_cb = session.get_mysql_end_trans_cb();
ObAsyncCmdDriver drv(gctx_, ctx_, session, retry_ctrl_, *this, is_prexecute());
if (OB_FAIL(sql_end_cb.init(packet_sender_, &session,
stmt_id_, params_num_))) {
stmt_id_, params_num_,
is_prexecute() ? packet_sender_.get_comp_seq() : 0))) {
LOG_WARN("failed to init sql end callback", K(ret));
} else if (OB_FAIL(drv.response_result(result))) {
LOG_WARN("fail response async result", K(ret));