[CP] adjust update last packet interface

This commit is contained in:
obdev
2022-11-07 17:05:36 +00:00
committed by wangzelin.wzl
parent 05fd6fcff7
commit 1ae305f255
11 changed files with 52 additions and 27 deletions

View File

@ -102,8 +102,11 @@ void ObSqlEndTransCb::callback(int cb_param)
//network problem, callback will still be called //network problem, callback will still be called
tmp_ret = OB_CONNECT_ERROR; tmp_ret = OB_CONNECT_ERROR;
SERVER_LOG(INFO, "connection is invalid", "ret", tmp_ret); SERVER_LOG(INFO, "connection is invalid", "ret", tmp_ret);
} else if (OB_SUCCESS != packet_sender_.alloc_ezbuf()) {
LOG_WARN("failed to alloc easy buf");
} else if (OB_SUCCESS != packet_sender_.update_last_pkt_pos()) {
LOG_WARN("failed to update last packet pos");
} else { } else {
packet_sender_.update_last_pkt_pos();
session_info->set_show_warnings_buf(cb_param); session_info->set_show_warnings_buf(cb_param);
if (OB_SUCCESS == cb_param) { if (OB_SUCCESS == cb_param) {
//ok pakcet //ok pakcet

View File

@ -71,10 +71,14 @@ int ObSyncCmdDriver::send_eof_packet(bool has_more_result)
// for proxy // for proxy
// in multi-stmt, send extra ok packet in the last stmt(has no more result) // in multi-stmt, send extra ok packet in the last stmt(has no more result)
if (!is_prexecute_ && !has_more_result) { if (!is_prexecute_ && !has_more_result
sender_.update_last_pkt_pos(); && OB_FAIL(sender_.update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} }
if (OB_FAIL(sender_.response_packet(eofp, &session_))) {
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(sender_.response_packet(eofp, &session_))) {
LOG_WARN("response packet fail", K(ret), K(has_more_result)); LOG_WARN("response packet fail", K(ret), K(has_more_result));
} }
return ret; return ret;

View File

@ -150,8 +150,9 @@ int ObSyncPlanDriver::response_result(ObMySQLResultSet &result)
// for proxy // for proxy
// in multi-stmt, send extra ok packet in the last stmt(has no more result) // in multi-stmt, send extra ok packet in the last stmt(has no more result)
if (!is_prexecute_ && !result.has_more_result()) { if (!is_prexecute_ && !result.has_more_result()
sender_.update_last_pkt_pos(); && OB_FAIL(sender_.update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} }
if (OB_SUCC(ret) && !result.get_is_com_filed_list() && if (OB_SUCC(ret) && !result.get_is_com_filed_list() &&
OB_FAIL(sender_.response_packet(eofp, &result.get_session()))) { OB_FAIL(sender_.response_packet(eofp, &result.get_session()))) {

View File

@ -51,7 +51,7 @@ protected:
bool is_conn_valid() const { return packet_sender_.is_conn_valid(); } bool is_conn_valid() const { return packet_sender_.is_conn_valid(); }
virtual void update_last_pkt_pos() { return packet_sender_.update_last_pkt_pos(); } virtual int update_last_pkt_pos() { return packet_sender_.update_last_pkt_pos(); }
virtual bool need_send_extra_ok_packet() { return packet_sender_.need_send_extra_ok_packet(); } virtual bool need_send_extra_ok_packet() { return packet_sender_.need_send_extra_ok_packet(); }

View File

@ -691,6 +691,18 @@ ObSMConnection* ObMPPacketSender::get_conn() const
return conn; return conn;
} }
int ObMPPacketSender::update_last_pkt_pos()
{
int ret = OB_SUCCESS;
if (NULL == ez_buf_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ez buf is null and cannot update last pkt pos for compress protocol", K(ret));
} else {
comp_context_.update_last_pkt_pos(ez_buf_->last);
}
return ret;
}
void ObMPPacketSender::force_disconnect() void ObMPPacketSender::force_disconnect()
{ {
LOG_WARN("force disconnect", K(lbt())); LOG_WARN("force disconnect", K(lbt()));

View File

@ -92,7 +92,7 @@ public:
virtual ~ObIMPPacketSender() { } virtual ~ObIMPPacketSender() { }
virtual void disconnect() = 0; virtual void disconnect() = 0;
virtual void force_disconnect() = 0; virtual void force_disconnect() = 0;
virtual void update_last_pkt_pos() = 0; virtual int update_last_pkt_pos() = 0;
virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) = 0; virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) = 0;
virtual int send_error_packet(int err, virtual int send_error_packet(int err,
const char* errmsg, const char* errmsg,
@ -113,12 +113,7 @@ public:
void reset(); void reset();
virtual void disconnect() override; virtual void disconnect() override;
virtual void force_disconnect() override; virtual void force_disconnect() override;
virtual void update_last_pkt_pos() override virtual int update_last_pkt_pos() override;
{
if (NULL != ez_buf_) {
comp_context_.update_last_pkt_pos(ez_buf_->last);
}
}
virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) override; virtual int response_packet(obmysql::ObMySQLPacket &pkt, sql::ObSQLSessionInfo* session) override;
// when connect with proxy, need to append extra ok packet to last statu packet // when connect with proxy, need to append extra ok packet to last statu packet
int response_compose_packet(obmysql::ObMySQLPacket &pkt, int response_compose_packet(obmysql::ObMySQLPacket &pkt,
@ -153,12 +148,12 @@ public:
bool is_disable_response() const { return req_has_wokenup_; } bool is_disable_response() const { return req_has_wokenup_; }
int clean_buffer(); int clean_buffer();
bool has_pl(); bool has_pl();
int alloc_ezbuf();
private: private:
static const int64_t MAX_TRY_STEPS = 8; static const int64_t MAX_TRY_STEPS = 8;
static int64_t TRY_EZ_BUF_SIZES[MAX_TRY_STEPS]; static int64_t TRY_EZ_BUF_SIZES[MAX_TRY_STEPS];
int alloc_ezbuf();
int try_encode_with(obmysql::ObMySQLPacket &pkt, int try_encode_with(obmysql::ObMySQLPacket &pkt,
int64_t current_size, int64_t current_size,
int64_t &seri_size, int64_t &seri_size,

View File

@ -32,9 +32,13 @@ int ObMPStatistic::process()
// //
const common::ObString tmp_string("Active threads not support"); const common::ObString tmp_string("Active threads not support");
obmysql::OMPKString pkt(tmp_string); obmysql::OMPKString pkt(tmp_string);
packet_sender_.update_last_pkt_pos();
ObSMConnection *conn = NULL; ObSMConnection *conn = NULL;
if (OB_FAIL(response_packet(pkt, NULL))) {
if (OB_FAIL(packet_sender_.alloc_ezbuf())) {
LOG_WARN("failed to alloc easy buf", K(ret));
} else if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(response_packet(pkt, NULL))) {
RPC_OBMYSQL_LOG(WARN, "fail to response statistic packet", K(ret)); RPC_OBMYSQL_LOG(WARN, "fail to response statistic packet", K(ret));
} else if (OB_ISNULL(conn = get_conn())) { } else if (OB_ISNULL(conn = get_conn())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;

View File

@ -2755,9 +2755,12 @@ int ObMPStmtExecute::response_query_header(ObSQLSessionInfo &session, pl::ObDbms
// for obproxy, 最后一次要把 eof和OK包放一起 // for obproxy, 最后一次要把 eof和OK包放一起
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
OX (update_last_pkt_pos()); if (OB_FAIL(update_last_pkt_pos())) {
if (OB_FAIL(response_packet(eofp, &session))) { LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(response_packet(eofp, &session))) {
LOG_WARN("response packet fail", K(ret)); LOG_WARN("response packet fail", K(ret));
} else {
// do nothing
} }
} }
// for obproxy // for obproxy

View File

@ -574,10 +574,11 @@ int ObMPStmtFetch::response_result(pl::ObPLCursorInfo &cursor,
// for proxy // for proxy
// in multi-stmt, send extra ok packet in the last stmt(has no more result) // in multi-stmt, send extra ok packet in the last stmt(has no more result)
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (!has_ok_packet()) { if (OB_FAIL(packet_sender_.alloc_ezbuf())) {
update_last_pkt_pos(); LOG_WARN("failed to alloc easy buf", K(ret));
} } else if (!has_ok_packet() && OB_FAIL(update_last_pkt_pos())) {
if (OB_FAIL(response_packet(eofp, &session))) { LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(response_packet(eofp, &session))) {
LOG_WARN("response packet fail", K(ret)); LOG_WARN("response packet fail", K(ret));
} else if (last_row && !cursor.is_scrollable() } else if (last_row && !cursor.is_scrollable()
&& !cursor.is_streaming() && !cursor.is_streaming()

View File

@ -315,9 +315,10 @@ int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session)
if (OB_FAIL(response_packet(piece_packet, &session))) { if (OB_FAIL(response_packet(piece_packet, &session))) {
LOG_WARN("response piece packet fail.", K(ret), K(stmt_id_), K(column_id_)); LOG_WARN("response piece packet fail.", K(ret), K(stmt_id_), K(column_id_));
} else { } else {
update_last_pkt_pos();
ObPiece *piece = NULL; ObPiece *piece = NULL;
if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) { if (OB_FAIL(update_last_pkt_pos())) {
LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(piece_cache->get_piece(stmt_id_, column_id_, piece))) {
LOG_WARN("get piece fail", K(stmt_id_), K(column_id_), K(ret) ); LOG_WARN("get piece fail", K(stmt_id_), K(column_id_), K(ret) );
} else if (NULL != piece) { } else if (NULL != piece) {
uint64_t count = NULL == piece->get_buffer_array() uint64_t count = NULL == piece->get_buffer_array()

View File

@ -629,8 +629,9 @@ int ObMPStmtPrepare::send_column_packet(const ObSQLSessionInfo &session,
ret = OB_SUCCESS; ret = OB_SUCCESS;
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
packet_sender_.update_last_pkt_pos(); if (OB_FAIL(packet_sender_.update_last_pkt_pos())) {
if (OB_FAIL(send_eof_packet(session, result))) { LOG_WARN("failed to update last packet pos", K(ret));
} else if (OB_FAIL(send_eof_packet(session, result))) {
LOG_WARN("send eof field failed", K(ret)); LOG_WARN("send eof field failed", K(ret));
} }
} }