From 1ae305f255558de783e6ce47f701da7755569af8 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 7 Nov 2022 17:05:36 +0000 Subject: [PATCH] [CP] adjust update last packet interface --- src/observer/mysql/ob_mysql_end_trans_cb.cpp | 5 ++++- src/observer/mysql/ob_sync_cmd_driver.cpp | 10 +++++++--- src/observer/mysql/ob_sync_plan_driver.cpp | 5 +++-- src/observer/mysql/obmp_base.h | 2 +- src/observer/mysql/obmp_packet_sender.cpp | 12 ++++++++++++ src/observer/mysql/obmp_packet_sender.h | 11 +++-------- src/observer/mysql/obmp_statistic.cpp | 8 ++++++-- src/observer/mysql/obmp_stmt_execute.cpp | 7 +++++-- src/observer/mysql/obmp_stmt_fetch.cpp | 9 +++++---- src/observer/mysql/obmp_stmt_get_piece_data.cpp | 5 +++-- src/observer/mysql/obmp_stmt_prepare.cpp | 5 +++-- 11 files changed, 52 insertions(+), 27 deletions(-) diff --git a/src/observer/mysql/ob_mysql_end_trans_cb.cpp b/src/observer/mysql/ob_mysql_end_trans_cb.cpp index 60a038d76..46318eaee 100644 --- a/src/observer/mysql/ob_mysql_end_trans_cb.cpp +++ b/src/observer/mysql/ob_mysql_end_trans_cb.cpp @@ -102,8 +102,11 @@ void ObSqlEndTransCb::callback(int cb_param) //network problem, callback will still be called tmp_ret = OB_CONNECT_ERROR; SERVER_LOG(INFO, "connection is invalid", "ret", tmp_ret); + } else if (OB_SUCCESS != packet_sender_.alloc_ezbuf()) { + LOG_WARN("failed to alloc easy buf"); + } else if (OB_SUCCESS != packet_sender_.update_last_pkt_pos()) { + LOG_WARN("failed to update last packet pos"); } else { - packet_sender_.update_last_pkt_pos(); session_info->set_show_warnings_buf(cb_param); if (OB_SUCCESS == cb_param) { //ok pakcet diff --git a/src/observer/mysql/ob_sync_cmd_driver.cpp b/src/observer/mysql/ob_sync_cmd_driver.cpp index c32ccb4c3..4436ce9e7 100644 --- a/src/observer/mysql/ob_sync_cmd_driver.cpp +++ b/src/observer/mysql/ob_sync_cmd_driver.cpp @@ -71,10 +71,14 @@ int ObSyncCmdDriver::send_eof_packet(bool has_more_result) // for proxy // in multi-stmt, send extra ok packet in the last stmt(has no more result) - if (!is_prexecute_ && !has_more_result) { - sender_.update_last_pkt_pos(); + if (!is_prexecute_ && !has_more_result + && OB_FAIL(sender_.update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); } - if (OB_FAIL(sender_.response_packet(eofp, &session_))) { + + if (OB_FAIL(ret)) { + // do nothing + } else if (OB_FAIL(sender_.response_packet(eofp, &session_))) { LOG_WARN("response packet fail", K(ret), K(has_more_result)); } return ret; diff --git a/src/observer/mysql/ob_sync_plan_driver.cpp b/src/observer/mysql/ob_sync_plan_driver.cpp index a1ad23c0c..6018d3d9e 100644 --- a/src/observer/mysql/ob_sync_plan_driver.cpp +++ b/src/observer/mysql/ob_sync_plan_driver.cpp @@ -150,8 +150,9 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result) // for proxy // in multi-stmt, send extra ok packet in the last stmt(has no more result) - if (!is_prexecute_ && !result.has_more_result()) { - sender_.update_last_pkt_pos(); + if (!is_prexecute_ && !result.has_more_result() + && OB_FAIL(sender_.update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); } if (OB_SUCC(ret) && !result.get_is_com_filed_list() && OB_FAIL(sender_.response_packet(eofp, &result.get_session()))) { diff --git a/src/observer/mysql/obmp_base.h b/src/observer/mysql/obmp_base.h index 986a2632d..679949ca9 100644 --- a/src/observer/mysql/obmp_base.h +++ b/src/observer/mysql/obmp_base.h @@ -51,7 +51,7 @@ protected: bool is_conn_valid() const { return packet_sender_.is_conn_valid(); } - virtual void update_last_pkt_pos() { return packet_sender_.update_last_pkt_pos(); } + virtual int update_last_pkt_pos() { return packet_sender_.update_last_pkt_pos(); } virtual bool need_send_extra_ok_packet() { return packet_sender_.need_send_extra_ok_packet(); } diff --git a/src/observer/mysql/obmp_packet_sender.cpp b/src/observer/mysql/obmp_packet_sender.cpp index 99210ac06..66f1c541a 100644 --- a/src/observer/mysql/obmp_packet_sender.cpp +++ b/src/observer/mysql/obmp_packet_sender.cpp @@ -691,6 +691,18 @@ ObSMConnection* ObMPPacketSender::get_conn() const return conn; } +int ObMPPacketSender::update_last_pkt_pos() +{ + int ret = OB_SUCCESS; + if (NULL == ez_buf_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ez buf is null and cannot update last pkt pos for compress protocol", K(ret)); + } else { + comp_context_.update_last_pkt_pos(ez_buf_->last); + } + return ret; +} + void ObMPPacketSender::force_disconnect() { LOG_WARN("force disconnect", K(lbt())); diff --git a/src/observer/mysql/obmp_packet_sender.h b/src/observer/mysql/obmp_packet_sender.h index ba06b9e7d..fb60c31c5 100644 --- a/src/observer/mysql/obmp_packet_sender.h +++ b/src/observer/mysql/obmp_packet_sender.h @@ -92,7 +92,7 @@ public: virtual ~ObIMPPacketSender() { } virtual void disconnect() = 0; virtual void force_disconnect() = 0; - virtual void update_last_pkt_pos() = 0; + virtual int update_last_pkt_pos() = 0; virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) = 0; virtual int send_error_packet(int err, const char* errmsg, @@ -113,12 +113,7 @@ public: void reset(); virtual void disconnect() override; virtual void force_disconnect() override; - virtual void update_last_pkt_pos() override - { - if (NULL != ez_buf_) { - comp_context_.update_last_pkt_pos(ez_buf_->last); - } - } + virtual int update_last_pkt_pos() override; virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) override; // when connect with proxy, need to append extra ok packet to last statu packet int response_compose_packet(obmysql::ObMySQLPacket &pkt, @@ -153,12 +148,12 @@ public: bool is_disable_response() const { return req_has_wokenup_; } int clean_buffer(); bool has_pl(); + int alloc_ezbuf(); private: static const int64_t MAX_TRY_STEPS = 8; static int64_t TRY_EZ_BUF_SIZES[MAX_TRY_STEPS]; - int alloc_ezbuf(); int try_encode_with(obmysql::ObMySQLPacket &pkt, int64_t current_size, int64_t &seri_size, diff --git a/src/observer/mysql/obmp_statistic.cpp b/src/observer/mysql/obmp_statistic.cpp index 668f45af6..2dad98eca 100644 --- a/src/observer/mysql/obmp_statistic.cpp +++ b/src/observer/mysql/obmp_statistic.cpp @@ -32,9 +32,13 @@ int ObMPStatistic::process() // const common::ObString tmp_string("Active threads not support"); obmysql::OMPKString pkt(tmp_string); - packet_sender_.update_last_pkt_pos(); ObSMConnection *conn = NULL; - if (OB_FAIL(response_packet(pkt, NULL))) { + + if (OB_FAIL(packet_sender_.alloc_ezbuf())) { + LOG_WARN("failed to alloc easy buf", K(ret)); + } else if (OB_FAIL(packet_sender_.update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); + } else if (OB_FAIL(response_packet(pkt, NULL))) { RPC_OBMYSQL_LOG(WARN, "fail to response statistic packet", K(ret)); } else if (OB_ISNULL(conn = get_conn())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index 915b7ee26..2894dfb42 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -2755,9 +2755,12 @@ int ObMPStmtExecute::response_query_header(ObSQLSessionInfo &session, pl::ObDbms // for obproxy, 最后一次要把 eof和OK包放一起 if (OB_SUCC(ret)) { - OX (update_last_pkt_pos()); - if (OB_FAIL(response_packet(eofp, &session))) { + if (OB_FAIL(update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); + } else if (OB_FAIL(response_packet(eofp, &session))) { LOG_WARN("response packet fail", K(ret)); + } else { + // do nothing } } // for obproxy diff --git a/src/observer/mysql/obmp_stmt_fetch.cpp b/src/observer/mysql/obmp_stmt_fetch.cpp index 595d81a2d..879f84163 100644 --- a/src/observer/mysql/obmp_stmt_fetch.cpp +++ b/src/observer/mysql/obmp_stmt_fetch.cpp @@ -574,10 +574,11 @@ int ObMPStmtFetch::response_result(pl::ObPLCursorInfo &cursor, // for proxy // in multi-stmt, send extra ok packet in the last stmt(has no more result) if (OB_SUCC(ret)) { - if (!has_ok_packet()) { - update_last_pkt_pos(); - } - if (OB_FAIL(response_packet(eofp, &session))) { + if (OB_FAIL(packet_sender_.alloc_ezbuf())) { + LOG_WARN("failed to alloc easy buf", K(ret)); + } else if (!has_ok_packet() && OB_FAIL(update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); + } else if (OB_FAIL(response_packet(eofp, &session))) { LOG_WARN("response packet fail", K(ret)); } else if (last_row && !cursor.is_scrollable() && !cursor.is_streaming() diff --git a/src/observer/mysql/obmp_stmt_get_piece_data.cpp b/src/observer/mysql/obmp_stmt_get_piece_data.cpp index 3cfb9676e..53af9d16e 100644 --- a/src/observer/mysql/obmp_stmt_get_piece_data.cpp +++ b/src/observer/mysql/obmp_stmt_get_piece_data.cpp @@ -315,9 +315,10 @@ int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session) if (OB_FAIL(response_packet(piece_packet, &session))) { LOG_WARN("response piece packet fail.", K(ret), K(stmt_id_), K(column_id_)); } else { - update_last_pkt_pos(); ObPiece *piece = NULL; - if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) { + if (OB_FAIL(update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); + } else if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) { LOG_WARN("get piece fail", K(stmt_id_), K(column_id_), K(ret) ); } else if (NULL != piece) { uint64_t count = NULL == piece->get_buffer_array() diff --git a/src/observer/mysql/obmp_stmt_prepare.cpp b/src/observer/mysql/obmp_stmt_prepare.cpp index 0f515a425..ace8f70e6 100644 --- a/src/observer/mysql/obmp_stmt_prepare.cpp +++ b/src/observer/mysql/obmp_stmt_prepare.cpp @@ -629,8 +629,9 @@ int ObMPStmtPrepare::send_column_packet(const ObSQLSessionInfo &session, ret = OB_SUCCESS; } if (OB_SUCC(ret)) { - packet_sender_.update_last_pkt_pos(); - if (OB_FAIL(send_eof_packet(session, result))) { + if (OB_FAIL(packet_sender_.update_last_pkt_pos())) { + LOG_WARN("failed to update last packet pos", K(ret)); + } else if (OB_FAIL(send_eof_packet(session, result))) { LOG_WARN("send eof field failed", K(ret)); } }