[to #46844254]Memory expansion caused by merge_piece_buffer multiple buffer expansions

This commit is contained in:
LiuYoung00 2022-12-28 03:42:05 +00:00 committed by ob-robot
parent 7fe3d87f72
commit f4061aa54c

View File

@ -332,13 +332,13 @@ int ObMPStmtSendPieceData::store_piece(ObSQLSessionInfo &session)
LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_));
} else {
if (is_null_) {
piece->get_is_null_map().add_member(piece->get_position());
OZ (piece->get_is_null_map().add_member(piece->get_position()));
}
ObOKPParam ok_param;
ok_param.affected_rows_ = 0;
ok_param.is_partition_hit_ = session.partition_hit().get_bool();
ok_param.has_more_result_ = false;
if (OB_FAIL(send_ok_packet(session, ok_param))) {
if (OB_SUCC(ret) && OB_FAIL(send_ok_packet(session, ok_param))) {
LOG_WARN("send ok packet fail.", K(ret), K(stmt_id_));
}
}
@ -496,9 +496,12 @@ int ObPieceCache::close_all(ObSQLSessionInfo &session)
++iter) {
ObPiece *piece = iter->second;
int64_t key = get_piece_key(piece->get_stmt_id(), piece->get_param_id());
if (OB_FAIL(remove_piece(key, session))) {
int64_t tmp_ret = remove_piece(key, session);
// only save first error ret
ret = ret == OB_SUCCESS ? tmp_ret : ret;
if (OB_SUCCESS != tmp_ret) {
LOG_WARN("remove piece fail.", K(piece->get_stmt_id()),
K(piece->get_param_id()), K(ret));
K(piece->get_param_id()), K(tmp_ret));
}
}
}
@ -607,8 +610,7 @@ int ObPieceCache::get_buffer(int32_t stmt_id,
uint64_t &length,
common::ObFixedArray<ObSqlString, ObIAllocator> &str_buf,
char *is_null_map) {
int ret = OB_SUCCESS;
lib::is_oracle_mode()
int ret = lib::is_oracle_mode()
? get_oracle_buffer(stmt_id, param_id, count, length, str_buf, is_null_map)
: get_mysql_buffer(stmt_id, param_id, length, str_buf.at(0));
return ret;
@ -764,6 +766,41 @@ int ObPieceCache::add_piece_buffer(ObPiece *piece,
return ret;
}
static int pre_extend_str(ObPiece *piece,
ObSqlString &str,
ObPieceBufferArray *buffer_array,
const int32_t first_piece_size,
const int64_t array_size,
bool &is_enable)
{
int ret = OB_SUCCESS;
// may need extend str before append if piece size more then 4M
const int32_t pre_extend_thres = 4194304;
if (!is_enable) {
} else if (first_piece_size < pre_extend_thres) {
// just disable pre extend
is_enable = false;
} else {
int64_t index_pre = piece->get_position() - 1;
int64_t total_len = 0;
do {
index_pre++;
if (index_pre < 0 || index_pre >= array_size) {
break;
}
ObPieceBuffer *piece_buffer = &buffer_array->at(index_pre);
if (NULL != piece_buffer->get_piece_buffer()) {
const ObString buffer = *(piece_buffer->get_piece_buffer());
total_len += buffer.length();
}
} while (ObLastPiece != buffer_array->at(index_pre).get_piece_mode()
&& ObInvalidPiece != buffer_array->at(index_pre).get_piece_mode());
ret = str.extend(total_len + 1); // one more bytes for EOF
is_enable = false;
}
return ret;
}
// buf needs to allocate memory in the outer layer !!!
int ObPieceCache::merge_piece_buffer(ObPiece *piece,
ObSqlString &str)
@ -780,6 +817,7 @@ int ObPieceCache::merge_piece_buffer(ObPiece *piece,
int64_t array_size = buffer_array->count();
int64_t index = piece->get_position() - 1;
int64_t len = 0;
bool enable_pre_extern = true;
do {
index++;
if (index < 0 || index >= array_size) {
@ -788,12 +826,17 @@ int ObPieceCache::merge_piece_buffer(ObPiece *piece,
ObPieceBuffer *piece_buffer = &buffer_array->at(index);
if (NULL != piece_buffer->get_piece_buffer()) {
const ObString buffer = *(piece_buffer->get_piece_buffer());
str.append(buffer);
len += buffer.length();
// reduce alloc/free/memcpy for large buffers
OZ (pre_extend_str(piece, str, buffer_array, buffer.length(), array_size, enable_pre_extern));
OX (str.append(buffer));
OX (len += buffer.length());
}
} while (ObLastPiece != buffer_array->at(index).get_piece_mode()
&& ObInvalidPiece != buffer_array->at(index).get_piece_mode());
if (index < 0) {
&& ObInvalidPiece != buffer_array->at(index).get_piece_mode()
&& OB_SUCC(ret));
if (OB_FAIL(ret)) {
// do nothing
} else if (index < 0) {
piece->set_position(0);
ret = OB_ERR_UNEXPECTED;
LOG_WARN("error index.", K(array_size), K(index));