[to #54249319] fix: alter param_id_ type from int16_t to uint16_t

This commit is contained in:
haohao022
2024-01-17 09:42:53 +00:00
committed by ob-robot
parent 004042cb70
commit 37f26e7e4a
4 changed files with 41 additions and 27 deletions

View File

@ -46,7 +46,7 @@ ObMPStmtSendLongData::ObMPStmtSendLongData(const ObGlobalContext &gctx)
exec_start_timestamp_(0), exec_start_timestamp_(0),
exec_end_timestamp_(0), exec_end_timestamp_(0),
stmt_id_(0), stmt_id_(0),
param_id_(-1), param_id_(OB_MAX_PARAM_ID),
buffer_len_(0), buffer_len_(0),
buffer_(), buffer_(),
need_disconnect_(false) need_disconnect_(false)
@ -71,18 +71,22 @@ int ObMPStmtSendLongData::before_process()
const char* pos = pkt.get_cdata(); const char* pos = pkt.get_cdata();
// stmt_id // stmt_id
ObMySQLUtil::get_int4(pos, stmt_id_); ObMySQLUtil::get_int4(pos, stmt_id_);
ObMySQLUtil::get_int2(pos, param_id_); ObMySQLUtil::get_uint2(pos, param_id_);
if (stmt_id_ < 1 || param_id_ < 0) { if (OB_SUCC(ret) && stmt_id_ < 1) {
ret = OB_ERR_PARAM_INVALID; ret = OB_ERR_PARAM_INVALID;
LOG_WARN("send long data get error info.", K(stmt_id_), K(param_id_)); LOG_WARN("send_long_data receive unexpected stmt_id_", K(ret), K(stmt_id_), K(param_id_));
} else { } else if (param_id_ >= OB_PARAM_ID_OVERFLOW_RISK_THRESHOLD) {
buffer_len_ = pkt.get_clen()-7; LOG_WARN("param_id_ has the risk of overflow", K(ret), K(stmt_id_), K(param_id_));
buffer_.assign_ptr(pos, static_cast<ObString::obstr_size_t>(buffer_len_));
LOG_INFO("get info success in send long data protocol.",
K(stmt_id_), K(param_id_), K(buffer_len_), K(buffer_));
} }
LOG_INFO("send long data get param",K(stmt_id_), K(param_id_), if (OB_SUCC(ret)) {
K(buffer_len_), K(buffer_.length()), K(buffer_)); buffer_len_ = pkt.get_clen() - 7;
buffer_.assign_ptr(pos, static_cast<ObString::obstr_size_t>(buffer_len_));
LOG_INFO("resolve send_long_data protocol packet successfully",
K(stmt_id_), K(param_id_), K(buffer_len_));
LOG_DEBUG("send_long_data packet content", K(buffer_));
}
LOG_INFO("resolve send_long_data protocol packet",
K(ret), K(stmt_id_), K(param_id_), K(buffer_len_), K(buffer_.length()));
} }
return ret; return ret;
} }
@ -343,6 +347,8 @@ int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session)
LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_)); LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_));
} else { } else {
// send long data do not response. // send long data do not response.
LOG_INFO("store piece successfully", K(ret), K(session.get_sessid()),
K(stmt_id_), K(param_id_));
} }
} }
return ret; return ret;

View File

@ -74,7 +74,7 @@ private:
int64_t exec_start_timestamp_; int64_t exec_start_timestamp_;
int64_t exec_end_timestamp_; int64_t exec_end_timestamp_;
int32_t stmt_id_; int32_t stmt_id_;
int16_t param_id_; uint16_t param_id_;
uint64_t buffer_len_; uint64_t buffer_len_;
common::ObString buffer_; common::ObString buffer_;
bool need_disconnect_; bool need_disconnect_;

View File

@ -45,7 +45,7 @@ ObMPStmtSendPieceData::ObMPStmtSendPieceData(const ObGlobalContext &gctx)
exec_start_timestamp_(0), exec_start_timestamp_(0),
exec_end_timestamp_(0), exec_end_timestamp_(0),
stmt_id_(0), stmt_id_(0),
param_id_(-1), param_id_(OB_MAX_PARAM_ID),
buffer_len_(0), buffer_len_(0),
buffer_(), buffer_(),
piece_mode_(ObInvalidPiece), piece_mode_(ObInvalidPiece),
@ -78,19 +78,23 @@ int ObMPStmtSendPieceData::before_process()
ObMySQLUtil::get_int1(pos, piece_mode_); ObMySQLUtil::get_int1(pos, piece_mode_);
int8_t is_null = 0; int8_t is_null = 0;
ObMySQLUtil::get_int1(pos, is_null); ObMySQLUtil::get_int1(pos, is_null);
1 == is_null ? is_null_ = true : is_null_ = false; is_null_ = (1 == is_null);
ObMySQLUtil::get_int8(pos, buffer_len_); ObMySQLUtil::get_int8(pos, buffer_len_);
if (stmt_id_ < 1 || param_id_ < 0 || buffer_len_ < 0) { if (stmt_id_ < 1 || buffer_len_ < 0) {
ret = OB_ERR_PARAM_INVALID; ret = OB_ERR_PARAM_INVALID;
LOG_WARN("send long data get error info.", K(stmt_id_), K(param_id_), K(buffer_len_)); LOG_WARN("send_piece receive unexpected params", K(ret), K(stmt_id_), K(buffer_len_));
} else { } else if (param_id_ >= OB_PARAM_ID_OVERFLOW_RISK_THRESHOLD) {
LOG_WARN("param_id_ has the risk of overflow", K(ret), K(stmt_id_), K(param_id_));
}
if (OB_SUCC(ret)) {
buffer_.assign_ptr(pos, static_cast<ObString::obstr_size_t>(buffer_len_)); buffer_.assign_ptr(pos, static_cast<ObString::obstr_size_t>(buffer_len_));
pos += buffer_len_; pos += buffer_len_;
LOG_DEBUG("get info success in send long data protocol.", LOG_INFO("resolve send_piece protocol packet successfully",
K(stmt_id_), K(param_id_)); K(ret), K(stmt_id_), K(param_id_), K(buffer_len_));
LOG_DEBUG("send_piece packet content", K(buffer_));
} }
LOG_DEBUG("send long data get param",K(stmt_id_), K(param_id_), LOG_INFO("resolve send_piece protocol packet",
K(piece_mode_), K(buffer_len_), K(buffer_.length())); K(ret), K(stmt_id_), K(param_id_), K(buffer_len_), K(piece_mode_), K(is_null_));
} }
return ret; return ret;
} }
@ -331,6 +335,8 @@ int ObMPStmtSendPieceData::store_piece(ObSQLSessionInfo &session)
&buffer_))) { &buffer_))) {
LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_)); LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_));
} else { } else {
LOG_INFO("store piece successfully", K(ret), K(session.get_sessid()),
K(stmt_id_), K(param_id_));
if (is_null_) { if (is_null_) {
OZ (piece->get_is_null_map().add_member(piece->get_position())); OZ (piece->get_is_null_map().add_member(piece->get_position()));
} }
@ -384,7 +390,7 @@ int ObPiece::piece_init(ObSQLSessionInfo &session,
OB_ALLOCATE_MEMORY_FAILED, sizeof(ObPieceBufferArray)); OB_ALLOCATE_MEMORY_FAILED, sizeof(ObPieceBufferArray));
OX (MEMSET(buf, 0, sizeof(ObPieceBufferArray))); OX (MEMSET(buf, 0, sizeof(ObPieceBufferArray)));
OV (OB_NOT_NULL(buf_array = new (buf) ObPieceBufferArray(alloc))); OV (OB_NOT_NULL(buf_array = new (buf) ObPieceBufferArray(alloc)));
OZ (buf_array->reserve(OB_MAX_PIECE_COUNT)); OZ (buf_array->reserve(OB_MAX_PIECE_BUFFER_COUNT));
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
set_buffer_array(buf_array); set_buffer_array(buf_array);
} else { } else {
@ -697,7 +703,7 @@ int ObPieceCache::make_piece_buffer(ObIAllocator *allocator,
OX (MEMSET(piece_mem, 0, sizeof(ObPieceBuffer))); OX (MEMSET(piece_mem, 0, sizeof(ObPieceBuffer)));
OV (OB_NOT_NULL(piece_buffer = new (piece_mem) ObPieceBuffer(allocator, mode))); OV (OB_NOT_NULL(piece_buffer = new (piece_mem) ObPieceBuffer(allocator, mode)));
CK (OB_NOT_NULL(piece_buffer)); CK (OB_NOT_NULL(piece_buffer));
OX (piece_buffer->set_piece_buffer(buf)); OZ (piece_buffer->set_piece_buffer(buf));
LOG_DEBUG("make piece buffer.", K(ret), K(mode), K(buf->length())); LOG_DEBUG("make piece buffer.", K(ret), K(mode), K(buf->length()));
return ret; return ret;
} }

View File

@ -148,15 +148,17 @@ private:
ObIAllocator *allocator_; ObIAllocator *allocator_;
}; };
#define OB_MAX_PIECE_COUNT 1024 #define OB_MAX_PIECE_BUFFER_COUNT 1024
#define OB_MAX_PARAM_ID UINT16_MAX
#define OB_PARAM_ID_OVERFLOW_RISK_THRESHOLD (OB_MAX_PARAM_ID - 4096) // imprecise estimate
typedef common::ObFixedArray<ObPieceBuffer, common::ObIAllocator> ObPieceBufferArray; typedef common::ObFixedArray<ObPieceBuffer, common::ObIAllocator> ObPieceBufferArray;
class ObPiece class ObPiece
{ {
public: public:
ObPiece() ObPiece()
: stmt_id_(0), : stmt_id_(0),
param_id_(-1), param_id_(OB_MAX_PARAM_ID),
pos_(0), pos_(0),
buffer_array_(NULL), buffer_array_(NULL),
is_null_map_(), is_null_map_(),
@ -173,7 +175,7 @@ public:
entity_ = nullptr; entity_ = nullptr;
} }
stmt_id_ = 0; stmt_id_ = 0;
param_id_ = -1; param_id_ = OB_MAX_PARAM_ID;
pos_ = 0; pos_ = 0;
err_ret_ = OB_SUCCESS; err_ret_ = OB_SUCCESS;
} }