[CP] [to #53858480, #54677465] free pieces in piece_cache_ synchrounously

This commit is contained in:
obdev
2024-02-20 09:15:50 +00:00
committed by ob-robot
parent d1d2c34d47
commit 661031247f
9 changed files with 46 additions and 42 deletions

View File

@ -133,7 +133,7 @@ void ObSqlEndTransCb::callback(int cb_param)
session_info->reset_warnings_buf();
}
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session_info->get_piece_cache());
ObPieceCache *piece_cache = session_info->get_piece_cache();
if (OB_ISNULL(piece_cache)) {
// do nothing
// piece_cache not be null in piece data protocol

View File

@ -1913,27 +1913,32 @@ int ObMPStmtExecute::process()
}
session.check_and_reset_retry_info(*cur_trace_id, THIS_WORKER.need_retry());
session.set_last_trace_id(ObCurTraceId::get_trace_id());
// whether the previous error was reported, a cleanup is to be done here
if (!async_resp_used) {
// async remove in ObSqlEndTransCb
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache());
if (OB_ISNULL(piece_cache)) {
// do nothing
// piece_cache not be null in piece data protocol
} else {
if (!retry_ctrl_.need_retry()) {
// if no retry would be performed any more, clear the piece cache
ObPieceCache *piece_cache = nullptr;
int upper_scope_ret = ret;
ret = OB_SUCCESS;
piece_cache = session.get_piece_cache();
if (OB_NOT_NULL(piece_cache)) {
for (uint64_t i = 0; OB_SUCC(ret) && i < params_num_; i++) {
if (OB_FAIL(piece_cache->remove_piece(
piece_cache->get_piece_key(stmt_id_, i),
session))) {
piece_cache->get_piece_key(stmt_id_, i), session))) {
if (OB_HASH_NOT_EXIST == ret) {
ret = OB_SUCCESS;
LOG_INFO("piece hash not exist", K(ret), K(stmt_id_), K(i));
} else {
LOG_WARN("remove piece fail", K(stmt_id_), K(i), K(ret));
need_disconnect = true;
LOG_WARN("remove piece fail", K(ret), K(need_disconnect), K(stmt_id_), K(i));
}
}
}
} else {
LOG_DEBUG("piece_cache_ is null");
}
ret = upper_scope_ret;
}
record_flt_trace(session);
}
@ -2510,9 +2515,8 @@ int ObMPStmtExecute::parse_param_value(ObIAllocator &allocator,
uint64_t count = 1;
common::ObFixedArray<ObSqlString, ObIAllocator>
str_buf(THIS_WORKER.get_sql_arena_allocator());
ObPieceCache *piece_cache = NULL == ctx_.session_info_
? NULL
: static_cast<ObPieceCache*>(ctx_.session_info_->get_piece_cache());
ObPieceCache *piece_cache =
NULL == ctx_.session_info_ ? NULL : ctx_.session_info_->get_piece_cache();
ObPiece *piece = NULL;
if (OB_NOT_NULL(piece_cache) && OB_FAIL(piece_cache->get_piece(stmt_id_, param_id, piece))) {
ret = OB_ERR_UNEXPECTED;

View File

@ -784,7 +784,7 @@ int ObMPStmtFetch::response_row(ObSQLSessionInfo &session,
{
int ret = OB_SUCCESS;
common::ObNewRow row;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(true));
ObPieceCache *piece_cache = session.get_piece_cache(true);
if (OB_ISNULL(piece_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id));

View File

@ -309,7 +309,7 @@ int ObMPStmtGetPieceData::response_result(ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
ObPieceBuffer piece_buf;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache());
ObPieceCache *piece_cache = session.get_piece_cache();
if (OB_ISNULL(piece_cache)) {
// must be init in fetch
ret = OB_ERR_UNEXPECTED;

View File

@ -78,7 +78,7 @@ int ObMPStmtReset::process()
} else if (OB_FAIL(update_transmission_checksum_flag(*session))) {
LOG_WARN("update transmisson checksum flag failed", K(ret));
} else {
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session->get_piece_cache());
ObPieceCache *piece_cache = session->get_piece_cache();
int64_t param_num = 0;
THIS_WORKER.set_session(session);
ObSQLSessionInfo::LockGuard lock_guard(session->get_query_lock());

View File

@ -176,7 +176,7 @@ int ObMPStmtSendLongData::process()
if (!need_disconnect_) {
ObPiece *piece = NULL;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(false));
ObPieceCache *piece_cache = session.get_piece_cache(false);
if (OB_ISNULL(piece_cache)) {
need_disconnect_ = true;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_), K(param_id_));
@ -323,7 +323,7 @@ int ObMPStmtSendLongData::do_process(ObSQLSessionInfo &session)
int ObMPStmtSendLongData::store_piece(ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(true));
ObPieceCache *piece_cache = session.get_piece_cache(true);
if (OB_ISNULL(piece_cache)) {
ret = OB_ERR_UNEXPECTED;
need_disconnect_ = true;

View File

@ -313,7 +313,7 @@ int ObMPStmtSendPieceData::do_process(ObSQLSessionInfo &session)
int ObMPStmtSendPieceData::store_piece(ObSQLSessionInfo &session)
{
int ret = OB_SUCCESS;
ObPieceCache *piece_cache = static_cast<ObPieceCache*>(session.get_piece_cache(true));
ObPieceCache *piece_cache = session.get_piece_cache(true);
if (OB_ISNULL(piece_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece cache is null.", K(ret), K(stmt_id_));
@ -369,22 +369,23 @@ int ObPiece::piece_init(ObSQLSessionInfo &session,
int32_t stmt_id,
uint16_t param_id) {
int ret = OB_SUCCESS;
lib::ContextParam param;
ObPieceCache* piece_cache = nullptr;
set_stmt_id(stmt_id);
set_param_id(param_id);
lib::MemoryContext entity = NULL;
lib::ContextParam param;
param.set_mem_attr(session.get_effective_tenant_id(),
ObModIds::OB_PL_TEMP, ObCtxIds::DEFAULT_CTX_ID);
param.set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE);
if (OB_FAIL((static_cast<ObPieceCache*>(session.get_piece_cache()))
->mem_context_->CREATE_CONTEXT(entity_, param))) {
LOG_WARN("failed to create ref cursor entity", K(ret));
param.set_page_size(OB_MALLOC_NORMAL_BLOCK_SIZE)
.set_mem_attr(session.get_effective_tenant_id(), "SendPieceProto", ObCtxIds::DEFAULT_CTX_ID);
if (OB_ISNULL(piece_cache = session.get_piece_cache())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("piece cache is null", K(ret));
} else if (OB_FAIL(piece_cache->mem_context_->CREATE_CONTEXT(entity_, param))) {
LOG_WARN("failed to create piece memory context", K(ret));
} else if (OB_ISNULL(entity_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc ref cursor entity", K(ret));
LOG_WARN("failed to alloc piece memory context", K(ret));
} else {
void *buf = NULL;
ObPieceBufferArray *buf_array = NULL;
void *buf = nullptr;
ObPieceBufferArray *buf_array = nullptr;
ObIAllocator *alloc = &entity_->get_arena_allocator();
OV (OB_NOT_NULL(buf = alloc->alloc(sizeof(ObPieceBufferArray))),
OB_ALLOCATE_MEMORY_FAILED, sizeof(ObPieceBufferArray));

View File

@ -639,11 +639,10 @@ void ObSQLSessionInfo::destroy(bool skip_sys_var)
}
if (OB_SUCC(ret) && NULL != piece_cache_) {
if (OB_FAIL((static_cast<observer::ObPieceCache*>(piece_cache_))
->close_all(*this))) {
if (OB_FAIL(piece_cache_->close_all(*this))) {
LOG_WARN("failed to close all piece", K(ret));
}
static_cast<observer::ObPieceCache*>(piece_cache_)->~ObPieceCache();
piece_cache_->~ObPieceCache();
get_session_allocator().free(piece_cache_);
piece_cache_ = NULL;
}
@ -2860,15 +2859,14 @@ int ObSQLSessionInfo::ps_use_stream_result_set(bool &use_stream) {
return ret;
}
void* ObSQLSessionInfo::get_piece_cache(bool need_init) {
observer::ObPieceCache* ObSQLSessionInfo::get_piece_cache(bool need_init) {
if (NULL == piece_cache_ && need_init) {
void *buf = get_session_allocator().alloc(sizeof(observer::ObPieceCache));
if (NULL != buf) {
MEMSET(buf, 0, sizeof(observer::ObPieceCache));
piece_cache_ = new (buf) observer::ObPieceCache();
if (OB_SUCCESS != (static_cast<observer::ObPieceCache*>(piece_cache_))->init(
get_effective_tenant_id())) {
static_cast<observer::ObPieceCache*>(piece_cache_)->~ObPieceCache();
if (OB_SUCCESS != piece_cache_->init(get_effective_tenant_id())) {
piece_cache_->~ObPieceCache();
get_session_allocator().free(piece_cache_);
piece_cache_ = NULL;
LOG_WARN_RET(OB_ERR_UNEXPECTED, "init piece cache fail");

View File

@ -53,6 +53,7 @@ namespace observer
{
class ObQueryDriver;
class ObSqlEndTransCb;
class ObPieceCache;
}
namespace pl
{
@ -1318,7 +1319,7 @@ public:
bool is_ignore_stmt() const { return is_ignore_stmt_; }
// piece
void *get_piece_cache(bool need_init = false);
observer::ObPieceCache *get_piece_cache(bool need_init = false);
void set_load_data_exec_session(bool v) { is_load_data_exec_session_ = v; }
bool is_load_data_exec_session() const { return is_load_data_exec_session_; }
@ -1472,7 +1473,7 @@ private:
bool is_ignore_stmt_;
ObSessionDDLInfo ddl_info_;
bool is_table_name_hidden_;
void *piece_cache_;
observer::ObPieceCache* piece_cache_;
bool is_load_data_exec_session_;
ObSqlString pl_exact_err_msg_;
bool is_varparams_sql_prepare_;