From f4061aa54c1b650c1405a667beb1e3dbae362fd4 Mon Sep 17 00:00:00 2001 From: LiuYoung00 Date: Wed, 28 Dec 2022 03:42:05 +0000 Subject: [PATCH] [to #46844254]Memory expansion caused by merge_piece_buffer multiple buffer expansions --- .../mysql/obmp_stmt_send_piece_data.cpp | 63 ++++++++++++++++--- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/src/observer/mysql/obmp_stmt_send_piece_data.cpp b/src/observer/mysql/obmp_stmt_send_piece_data.cpp index ae25c2fe4..c77baf1c7 100644 --- a/src/observer/mysql/obmp_stmt_send_piece_data.cpp +++ b/src/observer/mysql/obmp_stmt_send_piece_data.cpp @@ -332,13 +332,13 @@ int ObMPStmtSendPieceData::store_piece(ObSQLSessionInfo &session) LOG_WARN("add piece buffer fail.", K(ret), K(stmt_id_)); } else { if (is_null_) { - piece->get_is_null_map().add_member(piece->get_position()); + OZ (piece->get_is_null_map().add_member(piece->get_position())); } ObOKPParam ok_param; ok_param.affected_rows_ = 0; ok_param.is_partition_hit_ = session.partition_hit().get_bool(); ok_param.has_more_result_ = false; - if (OB_FAIL(send_ok_packet(session, ok_param))) { + if (OB_SUCC(ret) && OB_FAIL(send_ok_packet(session, ok_param))) { LOG_WARN("send ok packet fail.", K(ret), K(stmt_id_)); } } @@ -496,9 +496,12 @@ int ObPieceCache::close_all(ObSQLSessionInfo &session) ++iter) { ObPiece *piece = iter->second; int64_t key = get_piece_key(piece->get_stmt_id(), piece->get_param_id()); - if (OB_FAIL(remove_piece(key, session))) { + int64_t tmp_ret = remove_piece(key, session); + // only save first error ret + ret = ret == OB_SUCCESS ? tmp_ret : ret; + if (OB_SUCCESS != tmp_ret) { LOG_WARN("remove piece fail.", K(piece->get_stmt_id()), - K(piece->get_param_id()), K(ret)); + K(piece->get_param_id()), K(tmp_ret)); } } } @@ -607,8 +610,7 @@ int ObPieceCache::get_buffer(int32_t stmt_id, uint64_t &length, common::ObFixedArray &str_buf, char *is_null_map) { - int ret = OB_SUCCESS; - lib::is_oracle_mode() + int ret = lib::is_oracle_mode() ? get_oracle_buffer(stmt_id, param_id, count, length, str_buf, is_null_map) : get_mysql_buffer(stmt_id, param_id, length, str_buf.at(0)); return ret; @@ -764,6 +766,41 @@ int ObPieceCache::add_piece_buffer(ObPiece *piece, return ret; } +static int pre_extend_str(ObPiece *piece, + ObSqlString &str, + ObPieceBufferArray *buffer_array, + const int32_t first_piece_size, + const int64_t array_size, + bool &is_enable) +{ + int ret = OB_SUCCESS; + // may need extend str before append if piece size more then 4M + const int32_t pre_extend_thres = 4194304; + if (!is_enable) { + } else if (first_piece_size < pre_extend_thres) { + // just disable pre extend + is_enable = false; + } else { + int64_t index_pre = piece->get_position() - 1; + int64_t total_len = 0; + do { + index_pre++; + if (index_pre < 0 || index_pre >= array_size) { + break; + } + ObPieceBuffer *piece_buffer = &buffer_array->at(index_pre); + if (NULL != piece_buffer->get_piece_buffer()) { + const ObString buffer = *(piece_buffer->get_piece_buffer()); + total_len += buffer.length(); + } + } while (ObLastPiece != buffer_array->at(index_pre).get_piece_mode() + && ObInvalidPiece != buffer_array->at(index_pre).get_piece_mode()); + ret = str.extend(total_len + 1); // one more bytes for EOF + is_enable = false; + } + return ret; +} + // buf needs to allocate memory in the outer layer !!! int ObPieceCache::merge_piece_buffer(ObPiece *piece, ObSqlString &str) @@ -780,6 +817,7 @@ int ObPieceCache::merge_piece_buffer(ObPiece *piece, int64_t array_size = buffer_array->count(); int64_t index = piece->get_position() - 1; int64_t len = 0; + bool enable_pre_extern = true; do { index++; if (index < 0 || index >= array_size) { @@ -788,12 +826,17 @@ int ObPieceCache::merge_piece_buffer(ObPiece *piece, ObPieceBuffer *piece_buffer = &buffer_array->at(index); if (NULL != piece_buffer->get_piece_buffer()) { const ObString buffer = *(piece_buffer->get_piece_buffer()); - str.append(buffer); - len += buffer.length(); + // reduce alloc/free/memcpy for large buffers + OZ (pre_extend_str(piece, str, buffer_array, buffer.length(), array_size, enable_pre_extern)); + OX (str.append(buffer)); + OX (len += buffer.length()); } } while (ObLastPiece != buffer_array->at(index).get_piece_mode() - && ObInvalidPiece != buffer_array->at(index).get_piece_mode()); - if (index < 0) { + && ObInvalidPiece != buffer_array->at(index).get_piece_mode() + && OB_SUCC(ret)); + if (OB_FAIL(ret)) { + // do nothing + } else if (index < 0) { piece->set_position(0); ret = OB_ERR_UNEXPECTED; LOG_WARN("error index.", K(array_size), K(index));