[to #46638139] send long data remove error packet

This commit is contained in:
LiuYoung00
2022-12-13 06:38:02 +00:00
committed by ob-robot
parent f93ef2fe39
commit c61f55cc8e
6 changed files with 50 additions and 32 deletions

View File

@ -41,9 +41,6 @@ public:
} else {
SERVER_LOG(WARN, "MySQL command not supported", "cmd", pkt.get_cmd());
}
if (obmysql::COM_STMT_SEND_LONG_DATA == pkt.get_cmd()) {
disconnect();
}
}
return ret;
}

View File

@ -2269,6 +2269,9 @@ int ObMPStmtExecute::parse_param_value(ObIAllocator &allocator,
} else if (NULL == piece->get_allocator()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece allocator is null.", K(stmt_id_), K(param_id), K(ret));
} else if (OB_SUCCESS != piece->get_error_ret()) {
ret = piece->get_error_ret();
LOG_WARN("send long data has error. ", K(stmt_id_), K(param_id), K(ret));
} else {
if (OB_UNLIKELY(MYSQL_TYPE_COMPLEX == type)) {
// this must be array bounding.

View File

@ -47,7 +47,8 @@ ObMPStmtSendLongData::ObMPStmtSendLongData(const ObGlobalContext &gctx)
stmt_id_(0),
param_id_(-1),
buffer_len_(0),
buffer_()
buffer_(),
need_disconnect_(false)
{
ctx_.exec_type_ = MpQuery;
}
@ -156,10 +157,34 @@ int ObMPStmtSendLongData::process()
}
}
if (!GCONF._enable_new_sql_nio) {
// if not open sql nio , replace all ret code to 4007
ret = OB_NOT_SUPPORTED;
LOG_WARN("send long data need open SQL NIO. ", K(ret), K(stmt_id_), K(param_id_), K(need_disconnect_));
}
if (OB_FAIL(ret)) {
send_error_packet(ret, NULL);
// send long data fail will not response packet, just print log
LOG_WARN("send long data error happend ", K(ret), K(stmt_id_), K(param_id_), K(need_disconnect_));
if (!need_disconnect_) {
ObPiece *piece = NULL;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(false));
if (OB_ISNULL(piece_cache)) {
need_disconnect_ = true;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(param_id_));
} else if (OB_SUCCESS != piece_cache->get_piece(stmt_id_, param_id_, piece)) {
need_disconnect_ = true;
LOG_WARN("get piece fail", K(stmt_id_), K(param_id_), K(ret));
} else if (NULL == piece) {
need_disconnect_ = true;
LOG_WARN("get piece fail", K(stmt_id_), K(param_id_), K(ret));
} else {
piece->set_error_ret(ret);
}
}
}
if (need_disconnect_) {
force_disconnect();
LOG_WARN("disconnect connection when send long data", K(ret));
}
session.set_last_trace_id(ObCurTraceId::get_trace_id());
@ -261,13 +286,8 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
//set read_only
if (OB_SUCC(ret)) {
session.set_has_exec_write_stmt(false);
} else {
bool is_partition_hit = session.partition_hit().get_bool();
int err = send_error_packet(ret, NULL, is_partition_hit);
if (OB_SUCCESS != err) { // 发送error包
LOG_WARN("send error packet failed", K(ret), K(err));
}
}
if (enable_sql_audit) {
audit_record.status_ = ret;
audit_record.client_addr_ = session.get_peer_addr();
@ -292,7 +312,8 @@ int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session)
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(true));
if (OB_ISNULL(piece_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_));
need_disconnect_ = true;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(param_id_));
} else {
ObPiece *piece = NULL;
if (OB_FAIL(piece_cache->get_piece(stmt_id_, param_id_, piece))) {
@ -302,17 +323,16 @@ int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session)
LOG_WARN("make piece fail.", K(ret), K(stmt_id_));
}
}
if (OB_SUCC(ret)) {
if (NULL == piece) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece is null.", K(ret), K(stmt_id_), K(param_id_));
} else if (OB_FAIL(piece_cache->add_piece_buffer(piece,
if (OB_FAIL(ret) || NULL == piece) {
ret = OB_SUCCESS == ret ? OB_ERR_UNEXPECTED : ret;
need_disconnect_ = true;
LOG_WARN("piece is null.", K(ret), K(piece), K(stmt_id_), K(param_id_));
} else if (OB_FAIL(piece_cache->add_piece_buffer(piece,
ObPieceMode::ObInvalidPiece,
&buffer_))) {
LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_));
} else {
// send long data do not response.
}
LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_));
} else {
// send long data do not response.
}
}
return ret;

View File

@ -77,6 +77,7 @@ private:
int16_t param_id_;
uint64_t buffer_len_;
common::ObString buffer_;
bool need_disconnect_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMPStmtSendLongData);

View File

@ -160,7 +160,8 @@ public:
pos_(0),
buffer_array_(NULL),
allocator_(NULL),
is_null_map_() {}
is_null_map_(),
err_ret_(OB_SUCCESS) {}
~ObPiece() { reset(); }
void reset()
{
@ -173,6 +174,7 @@ public:
stmt_id_ = 0;
param_id_ = -1;
pos_ = 0;
err_ret_ = OB_SUCCESS;
}
void reset_buffer_array()
{
@ -204,6 +206,8 @@ public:
ObPieceBufferArray *get_buffer_array() { return buffer_array_; }
void set_buffer_array(ObPieceBufferArray *array) { buffer_array_ = array; }
int piece_init(sql::ObSQLSessionInfo &session, int32_t stmt_id, int16_t param_id);
void set_error_ret(int err_ret) { err_ret_ = err_ret; }
int get_error_ret() { return err_ret_; }
private:
int32_t stmt_id_;
int16_t param_id_;
@ -211,6 +215,7 @@ private:
ObPieceBufferArray *buffer_array_;
ObIAllocator *allocator_;
common::ObBitSet<> is_null_map_;
int err_ret_;
}; // end of class ObPiece
class ObPieceCache {

View File

@ -207,7 +207,7 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
MYSQL_PROCESSOR(ObMPStmtPrexecute, gctx_);
MYSQL_PROCESSOR(ObMPStmtSendPieceData, gctx_);
MYSQL_PROCESSOR(ObMPStmtGetPieceData, gctx_);
//MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
MYSQL_PROCESSOR(ObMPResetConnection, gctx_);
// ps stmt close request may not response packet.
// Howerver, in get processor phase, it may report
@ -254,14 +254,6 @@ int ObSrvMySQLXlator::translate(rpc::ObRequest &req, ObReqProcessor *&processor)
}
break;
}
case obmysql::COM_STMT_SEND_LONG_DATA: {
if (GCONF._enable_new_sql_nio) {
NEW_MYSQL_PROCESSOR(ObMPStmtSendLongData, gctx_);
} else {
NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
}
break;
}
default:
NEW_MYSQL_PROCESSOR(ObMPDefault, gctx_);
break;