From bbe70ee03c539c1d5b7e1ad40505f11a5ff2f995 Mon Sep 17 00:00:00 2001 From: lucky-sinx <2549261744@qq.com> Date: Thu, 2 Jan 2025 07:45:27 +0000 Subject: [PATCH] fix for err+ok not in one 2.0pkt --- .../src/rpc/obmysql/ob_2_0_protocol_utils.cpp | 7 ++++++- deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.h | 3 ++- src/observer/mysql/obmp_packet_sender.cpp | 14 ++++++++------ src/observer/mysql/obmp_packet_sender.h | 3 ++- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.cpp b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.cpp index 810688174..0ed206320 100644 --- a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.cpp @@ -209,7 +209,7 @@ inline int ObProto20Utils::do_proto20_packet_encode(ObProtoEncodeParam ¶m) } } LOG_DEBUG("proto20 encode", "current next step", - get_proto20_encode_step_name(proto20_context.next_step_)); + get_proto20_encode_step_name(proto20_context.next_step_), K(need_break), K(param.need_flush_)); } if (OB_SUCC(ret)) { @@ -496,6 +496,11 @@ inline int ObProto20Utils::fill_proto20_payload(ObProtoEncodeParam ¶m, bool ret = OB_ERR_UNEXPECTED; LOG_ERROR("impossible", "read_avail", easy_buffer.read_avail_size(), "header len", proto20_context.header_len_, K(ret)); + } else if (param.is_composed_ok_pkt_) { + // 遇到了err+ok,但是ok包过大且已经写入了err包的内容,这里不可以变为FILL_TAILER_STEP状态, + // 否则err和ok就不算在一个包里面了 + param.need_flush_ = true; // break, alloc more memory + need_break = true; } else { // there must be enough space for tailer proto20_context.next_step_ = FILL_TAILER_STEP; diff --git a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.h b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.h index ed6880f77..2ea4b0834 100644 --- a/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.h +++ b/deps/oblib/src/rpc/obmysql/ob_2_0_protocol_utils.h @@ -130,6 +130,7 @@ public: const static int64_t MAX_PROTO20_PAYLOAD_LEN; const static int64_t PROTO20_SPLIT_LEN; + bool is_composed_ok_pkt_; public: ObProtoEncodeParam() @@ -137,7 +138,7 @@ public: seri_size_(0), conn_id_(0), encode_ret_(common::OB_SUCCESS), need_flush_(false), is_last_(false), is_pkt_encoded_(false), large_pkt_buf_(NULL), large_pkt_buf_len_(0), large_pkt_buf_pos_(0), - extra_info_kvs_(NULL), extra_info_ecds_(NULL), conn_(NULL) + extra_info_kvs_(NULL), extra_info_ecds_(NULL), conn_(NULL), is_composed_ok_pkt_(false) {} inline bool is_valid() const diff --git a/src/observer/mysql/obmp_packet_sender.cpp b/src/observer/mysql/obmp_packet_sender.cpp index 62cd50a67..026385939 100644 --- a/src/observer/mysql/obmp_packet_sender.cpp +++ b/src/observer/mysql/obmp_packet_sender.cpp @@ -253,7 +253,6 @@ int ObMPPacketSender::response_compose_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session, bool update_comp_pos) { int ret = OB_SUCCESS; - comp_context_.update_last_pkt_pos(ez_buf_->last); if (OB_FAIL(response_packet(pkt, session))) { LOG_WARN("failed to response packet", K(ret)); @@ -274,7 +273,8 @@ int ObMPPacketSender::response_compose_packet(obmysql::ObMySQLPacket &pkt, } else if (OB_FAIL(try_encode_with(okp, ez_buf_->end - ez_buf_->pos, seri_size, - 0))) { + 0, + true))) { LOG_WARN("failed to encode packet", K(ret)); } else { LOG_DEBUG("succ encode packet", K(okp), K(seri_size)); @@ -925,10 +925,12 @@ int ObMPPacketSender::send_eof_packet(const ObSQLSessionInfo &session, int ObMPPacketSender::try_encode_with(ObMySQLPacket &pkt, int64_t current_size, int64_t &seri_size, - int64_t try_steps) + int64_t try_steps, + bool is_composed_ok_pkt) { int ret = OB_SUCCESS; ObProtoEncodeParam param; + param.is_composed_ok_pkt_ = is_composed_ok_pkt; seri_size = 0; if (OB_ISNULL(ez_buf_)) { ret = OB_ERR_UNEXPECTED; @@ -975,12 +977,12 @@ int ObMPPacketSender::try_encode_with(ObMySQLPacket &pkt, // if failed, try flush ---> alloc larger mem ----> continue encoding int last_ret = param.encode_ret_; - if (need_flush_buffer()) { + if (need_flush_buffer() && !param.is_composed_ok_pkt_) { // try again with same buf size if (OB_FAIL(flush_buffer(false))) { LOG_WARN("failed to flush_buffer", K(ret), K(last_ret)); } else { - ret = try_encode_with(pkt, current_size, seri_size, try_steps); + ret = try_encode_with(pkt, current_size, seri_size, try_steps, is_composed_ok_pkt); } } else { if (try_steps >= MAX_TRY_STEPS) { @@ -997,7 +999,7 @@ int ObMPPacketSender::try_encode_with(ObMySQLPacket &pkt, if (OB_FAIL(resize_ezbuf(new_alloc_size))) { LOG_ERROR("fail to resize_ezbuf", K(ret), K(last_ret)); } else { - ret = try_encode_with(pkt, new_alloc_size, seri_size, try_steps); + ret = try_encode_with(pkt, new_alloc_size, seri_size, try_steps, is_composed_ok_pkt); } } } diff --git a/src/observer/mysql/obmp_packet_sender.h b/src/observer/mysql/obmp_packet_sender.h index 9c16fee1b..fc894f17e 100644 --- a/src/observer/mysql/obmp_packet_sender.h +++ b/src/observer/mysql/obmp_packet_sender.h @@ -183,7 +183,8 @@ private: int try_encode_with(obmysql::ObMySQLPacket &pkt, int64_t current_size, int64_t &seri_size, - int64_t try_steps); + int64_t try_steps, + bool is_composed_ok_pkt = false); int build_encode_param_(obmysql::ObProtoEncodeParam ¶m, obmysql::ObMySQLPacket *pkt, const bool is_last);