diff --git a/src/observer/mysql/ob_mysql_end_trans_cb.cpp b/src/observer/mysql/ob_mysql_end_trans_cb.cpp index 1d449cceda..430c835209 100644 --- a/src/observer/mysql/ob_mysql_end_trans_cb.cpp +++ b/src/observer/mysql/ob_mysql_end_trans_cb.cpp @@ -133,7 +133,7 @@ void ObSqlEndTransCb::callback(int cb_param) session_info->reset_warnings_buf(); } - ObPieceCache *piece_cache = static_cast(session_info->get_piece_cache()); + ObPieceCache *piece_cache = session_info->get_piece_cache(); if (OB_ISNULL(piece_cache)) { // do nothing // piece_cache not be null in piece data protocol diff --git a/src/observer/mysql/obmp_stmt_execute.cpp b/src/observer/mysql/obmp_stmt_execute.cpp index 1021470816..1fdece55bc 100644 --- a/src/observer/mysql/obmp_stmt_execute.cpp +++ b/src/observer/mysql/obmp_stmt_execute.cpp @@ -1913,27 +1913,32 @@ int ObMPStmtExecute::process() } session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry()); session.set_last_trace_id(ObCurTraceId::get_trace_id()); - // whether the previous error was reported, a cleanup is to be done here - if (!async_resp_used) { - // async remove in ObSqlEndTransCb - ObPieceCache *piece_cache = static_cast(session.get_piece_cache()); - if (OB_ISNULL(piece_cache)) { - // do nothing - // piece_cache not be null in piece data protocol - } else { + + if (!retry_ctrl_.need_retry()) { + // if no retry would be performed any more, clear the piece cache + ObPieceCache *piece_cache = nullptr; + int upper_scope_ret = ret; + ret = OB_SUCCESS; + piece_cache = session.get_piece_cache(); + if (OB_NOT_NULL(piece_cache)) { for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) { if (OB_FAIL(piece_cache->remove_piece( - piece_cache->get_piece_key(stmt_id_, i), - session))) { + piece_cache->get_piece_key(stmt_id_, i), session))) { if (OB_HASH_NOT_EXIST == ret) { ret = OB_SUCCESS; + LOG_INFO("piece hash not exist", K(ret), K(stmt_id_), K(i)); } else { - LOG_WARN("remove piece fail", K(stmt_id_), K(i), K(ret)); + need_disconnect = true; + LOG_WARN("remove piece fail", K(ret), K(need_disconnect), K(stmt_id_), K(i)); } } } + } else { + LOG_DEBUG("piece_cache_ is null"); } + ret = upper_scope_ret; } + record_flt_trace(session); } @@ -2510,9 +2515,8 @@ int ObMPStmtExecute::parse_param_value(ObIAllocator &allocator, uint64_t count = 1; common::ObFixedArray str_buf(THIS_WORKER.get_sql_arena_allocator()); - ObPieceCache *piece_cache = NULL == ctx_.session_info_ - ? NULL - : static_cast(ctx_.session_info_->get_piece_cache()); + ObPieceCache *piece_cache = + NULL == ctx_.session_info_ ? NULL : ctx_.session_info_->get_piece_cache(); ObPiece *piece = NULL; if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) { ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/mysql/obmp_stmt_fetch.cpp b/src/observer/mysql/obmp_stmt_fetch.cpp index 5ec6726275..58a14b2d9b 100644 --- a/src/observer/mysql/obmp_stmt_fetch.cpp +++ b/src/observer/mysql/obmp_stmt_fetch.cpp @@ -784,7 +784,7 @@ int ObMPStmtFetch::response_row(ObSQLSessionInfo &session, { int ret = OB_SUCCESS; common::ObNewRow row; - ObPieceCache *piece_cache = static_cast(session.get_piece_cache(true)); + ObPieceCache *piece_cache = session.get_piece_cache(true); if (OB_ISNULL(piece_cache)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("piece cache is null.", K(ret), K(stmt_id)); diff --git a/src/observer/mysql/obmp_stmt_get_piece_data.cpp b/src/observer/mysql/obmp_stmt_get_piece_data.cpp index 337232469b..795d4f8ea6 100644 --- a/src/observer/mysql/obmp_stmt_get_piece_data.cpp +++ b/src/observer/mysql/obmp_stmt_get_piece_data.cpp @@ -309,7 +309,7 @@ int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session) { int ret = OB_SUCCESS; ObPieceBuffer piece_buf; - ObPieceCache *piece_cache = static_cast(session.get_piece_cache()); + ObPieceCache *piece_cache = session.get_piece_cache(); if (OB_ISNULL(piece_cache)) { // must be init in fetch ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/mysql/obmp_stmt_reset.cpp b/src/observer/mysql/obmp_stmt_reset.cpp index 3f1f7924ad..ebec7ed3cb 100644 --- a/src/observer/mysql/obmp_stmt_reset.cpp +++ b/src/observer/mysql/obmp_stmt_reset.cpp @@ -78,7 +78,7 @@ int ObMPStmtReset::process() } else if (OB_FAIL(update_transmission_checksum_flag(*session))) { LOG_WARN("update transmisson checksum flag failed", K(ret)); } else { - ObPieceCache *piece_cache = static_cast(session->get_piece_cache()); + ObPieceCache *piece_cache = session->get_piece_cache(); int64_t param_num = 0; THIS_WORKER.set_session(session); ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock()); diff --git a/src/observer/mysql/obmp_stmt_send_long_data.cpp b/src/observer/mysql/obmp_stmt_send_long_data.cpp index d5bd6f2030..fc6e5f7fdd 100644 --- a/src/observer/mysql/obmp_stmt_send_long_data.cpp +++ b/src/observer/mysql/obmp_stmt_send_long_data.cpp @@ -176,7 +176,7 @@ int ObMPStmtSendLongData::process() if (!need_disconnect_) { ObPiece *piece = NULL; - ObPieceCache *piece_cache = static_cast(session.get_piece_cache(false)); + ObPieceCache *piece_cache = session.get_piece_cache(false); if (OB_ISNULL(piece_cache)) { need_disconnect_ = true; LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(param_id_)); @@ -323,7 +323,7 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session) int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session) { int ret = OB_SUCCESS; - ObPieceCache *piece_cache = static_cast(session.get_piece_cache(true)); + ObPieceCache *piece_cache = session.get_piece_cache(true); if (OB_ISNULL(piece_cache)) { ret = OB_ERR_UNEXPECTED; need_disconnect_ = true; diff --git a/src/observer/mysql/obmp_stmt_send_piece_data.cpp b/src/observer/mysql/obmp_stmt_send_piece_data.cpp index 30d2eafe43..a008515772 100644 --- a/src/observer/mysql/obmp_stmt_send_piece_data.cpp +++ b/src/observer/mysql/obmp_stmt_send_piece_data.cpp @@ -313,7 +313,7 @@ int ObMPStmtSendPieceData::do_process(ObSQLSessionInfo &session) int ObMPStmtSendPieceData::store_piece(ObSQLSessionInfo &session) { int ret = OB_SUCCESS; - ObPieceCache *piece_cache = static_cast(session.get_piece_cache(true)); + ObPieceCache *piece_cache = session.get_piece_cache(true); if (OB_ISNULL(piece_cache)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("piece cache is null.", K(ret), K(stmt_id_)); @@ -369,22 +369,23 @@ int ObPiece::piece_init(ObSQLSessionInfo &session, int32_t stmt_id, uint16_t param_id) { int ret = OB_SUCCESS; + lib::ContextParam param; + ObPieceCache* piece_cache = nullptr; set_stmt_id(stmt_id); set_param_id(param_id); - lib::MemoryContext entity = NULL; - lib::ContextParam param; - param.set_mem_attr(session.get_effective_tenant_id(), - ObModIds::OB_PL_TEMP, ObCtxIds::DEFAULT_CTX_ID); - param.set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE); - if (OB_FAIL((static_cast(session.get_piece_cache())) - ->mem_context_->CREATE_CONTEXT(entity_, param))) { - LOG_WARN("failed to create ref cursor entity", K(ret)); + param.set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE) + .set_mem_attr(session.get_effective_tenant_id(), "SendPieceProto", ObCtxIds::DEFAULT_CTX_ID); + if (OB_ISNULL(piece_cache = session.get_piece_cache())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("piece cache is null", K(ret)); + } else if (OB_FAIL(piece_cache->mem_context_->CREATE_CONTEXT(entity_, param))) { + LOG_WARN("failed to create piece memory context", K(ret)); } else if (OB_ISNULL(entity_)) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to alloc ref cursor entity", K(ret)); + LOG_WARN("failed to alloc piece memory context", K(ret)); } else { - void *buf = NULL; - ObPieceBufferArray *buf_array = NULL; + void *buf = nullptr; + ObPieceBufferArray *buf_array = nullptr; ObIAllocator *alloc = &entity_->get_arena_allocator(); OV (OB_NOT_NULL(buf = alloc->alloc(sizeof(ObPieceBufferArray))), OB_ALLOCATE_MEMORY_FAILED, sizeof(ObPieceBufferArray)); @@ -392,7 +393,7 @@ int ObPiece::piece_init(ObSQLSessionInfo &session, OV (OB_NOT_NULL(buf_array = new (buf) ObPieceBufferArray(alloc))); OZ (buf_array->reserve(OB_MAX_PIECE_BUFFER_COUNT)); if (OB_SUCC(ret)) { - set_buffer_array(buf_array); + set_buffer_array(buf_array); } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("alloc buffer array fail.", K(ret), K(stmt_id), K(param_id)); diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 26113c8c46..e99bab3a4b 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -639,11 +639,10 @@ void ObSQLSessionInfo::destroy(bool skip_sys_var) } if (OB_SUCC(ret) && NULL != piece_cache_) { - if (OB_FAIL((static_cast(piece_cache_)) - ->close_all(*this))) { + if (OB_FAIL(piece_cache_->close_all(*this))) { LOG_WARN("failed to close all piece", K(ret)); } - static_cast(piece_cache_)->~ObPieceCache(); + piece_cache_->~ObPieceCache(); get_session_allocator().free(piece_cache_); piece_cache_ = NULL; } @@ -2860,15 +2859,14 @@ int ObSQLSessionInfo::ps_use_stream_result_set(bool &use_stream) { return ret; } -void* ObSQLSessionInfo::get_piece_cache(bool need_init) { +observer::ObPieceCache* ObSQLSessionInfo::get_piece_cache(bool need_init) { if (NULL == piece_cache_ && need_init) { void *buf = get_session_allocator().alloc(sizeof(observer::ObPieceCache)); if (NULL != buf) { MEMSET(buf, 0, sizeof(observer::ObPieceCache)); piece_cache_ = new (buf) observer::ObPieceCache(); - if (OB_SUCCESS != (static_cast(piece_cache_))->init( - get_effective_tenant_id())) { - static_cast(piece_cache_)->~ObPieceCache(); + if (OB_SUCCESS != piece_cache_->init(get_effective_tenant_id())) { + piece_cache_->~ObPieceCache(); get_session_allocator().free(piece_cache_); piece_cache_ = NULL; LOG_WARN_RET(OB_ERR_UNEXPECTED, "init piece cache fail"); diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index 7768ec8ee9..7a7ae0f862 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -53,6 +53,7 @@ namespace observer { class ObQueryDriver; class ObSqlEndTransCb; +class ObPieceCache; } namespace pl { @@ -1318,7 +1319,7 @@ public: bool is_ignore_stmt() const { return is_ignore_stmt_; } // piece - void *get_piece_cache(bool need_init = false); + observer::ObPieceCache *get_piece_cache(bool need_init = false); void set_load_data_exec_session(bool v) { is_load_data_exec_session_ = v; } bool is_load_data_exec_session() const { return is_load_data_exec_session_; } @@ -1472,7 +1473,7 @@ private: bool is_ignore_stmt_; ObSessionDDLInfo ddl_info_; bool is_table_name_hidden_; - void *piece_cache_; + observer::ObPieceCache* piece_cache_; bool is_load_data_exec_session_; ObSqlString pl_exact_err_msg_; bool is_varparams_sql_prepare_;