diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index c7617eb7b0..f6a602aa85 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -132,6 +132,7 @@ #include "rootserver/ob_heartbeat_service.h" #include "share/detect/ob_detect_manager.h" #include "observer/table/ttl/ob_ttl_service.h" +#include "sql/dtl/ob_dtl_interm_result_manager.h" #ifdef ERRSIM #include "share/errsim_module/ob_tenant_errsim_module_mgr.h" #include "share/errsim_module/ob_tenant_errsim_event_mgr.h" @@ -515,6 +516,8 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(server_obj_pool_mtl_new, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy); MTL_BIND(ObDetectManager::mtl_init, ObDetectManager::mtl_destroy); MTL_BIND(ObTenantSQLSessionMgr::mtl_init, ObTenantSQLSessionMgr::mtl_destroy); + MTL_BIND2(mtl_new_default, ObDTLIntermResultManager::mtl_init, ObDTLIntermResultManager::mtl_start, + ObDTLIntermResultManager::mtl_stop, ObDTLIntermResultManager::mtl_wait, ObDTLIntermResultManager::mtl_destroy); if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) { MTL_BIND2(nullptr, nullptr, start_mysql_queue, mtl_stop_default, mtl_wait_default, mtl_destroy_default); @@ -1687,11 +1690,6 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_s LOG_WARN("failed to delete_tenant_usage_stat", K(ret), K(tenant_id)); } } - if (OB_SUCC(ret)) { - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_tenant_interm_result_info(tenant_id))) { - LOG_WARN("failed to erase_tenant_interm_result_info", K(ret), K(tenant_id)); - } - } if (OB_SUCC(ret)) { // only report event when ret = success diff --git a/src/observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.cpp b/src/observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.cpp index fc52021bb3..772c9836f2 100644 --- a/src/observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.cpp +++ b/src/observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.cpp @@ -10,12 +10,14 @@ * See the Mulan PubL v2 for more details. */ +#include "observer/omt/ob_tenant.h" +#include "observer/omt/ob_multi_tenant.h" #include "observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.h" #include "observer/ob_server_utils.h" -#include "sql/dtl/ob_dtl_interm_result_manager.h" #include "sql/session/ob_sql_session_info.h" #include "lib/hash/ob_hashmap.h" #include "sql/session/ob_sql_session_info.h" +#include "sql/dtl/ob_dtl_interm_result_manager.h" namespace oceanbase { @@ -228,11 +230,35 @@ int ObAllDtlIntermResultMonitor::fill_scanner() } else if (OB_FAIL(ObServerUtils::get_server_ip(allocator_, ipstr))) { SERVER_LOG(ERROR, "get server ip failed", K(ret)); } else { - ObDTLIntermResultMonitorInfoGetter monitor_getter(scanner_, *allocator_, output_column_ids_, - cur_row_, *addr_, ipstr, session_->get_effective_tenant_id()); - if (OB_FAIL(ObDTLIntermResultManager::getInstance().generate_monitor_info_rows(monitor_getter))) { - SERVER_LOG(WARN, "generate monitor info array failed", K(ret)); + uint64_t cur_tenant_id = MTL_ID(); + if(is_sys_tenant(cur_tenant_id)) { + omt::TenantList &list = GCTX.omt_->get_tenant_list(); + uint64_t tmp_tenant_id = 0; + for (omt::TenantList::iterator it = list.begin(); it != list.end() && OB_SUCC(ret); it++) { + tmp_tenant_id = (*it)->id(); + if(!is_virtual_tenant_id(tmp_tenant_id)) { + ObDTLIntermResultMonitorInfoGetter monitor_getter(scanner_, *allocator_, output_column_ids_, + cur_row_, *addr_, ipstr, tmp_tenant_id); + MTL_SWITCH(tmp_tenant_id) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->generate_monitor_info_rows(monitor_getter))) { + SERVER_LOG(WARN, "generate monitor info array failed", K(ret)); + } + } + } + } + } else if(is_user_tenant(cur_tenant_id)) { + ObDTLIntermResultMonitorInfoGetter monitor_getter(scanner_, *allocator_, output_column_ids_, + cur_row_, *addr_, ipstr, cur_tenant_id); + MTL_SWITCH(cur_tenant_id) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->generate_monitor_info_rows(monitor_getter))) { + SERVER_LOG(WARN, "generate monitor info array failed", K(ret)); + } + } } else { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "Non-system non-user tenants try generate_monitor_info_rows", K(ret)); + } + if(OB_SUCC(ret)) { scanner_it_ = scanner_.begin(); start_to_read_ = true; } diff --git a/src/share/detect/ob_detect_callback.cpp b/src/share/detect/ob_detect_callback.cpp index a8b3c39d6d..17e764695c 100644 --- a/src/share/detect/ob_detect_callback.cpp +++ b/src/share/detect/ob_detect_callback.cpp @@ -259,7 +259,7 @@ int ObSqcDetectCB::do_callback() int ObSingleDfoDetectCB::do_callback() { int ret = OB_SUCCESS; - ret= sql::dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key_, false); + ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false); ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret; LIB_LOG(WARN, "[DM] single dfo erase_interm_result_info", K(ret), K(key_), K_(trace_id)); return ret; @@ -268,7 +268,7 @@ int ObSingleDfoDetectCB::do_callback() int ObTempTableDetectCB::do_callback() { int ret = OB_SUCCESS; - ret = sql::dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key_, false); + ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false); ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret; LIB_LOG(WARN, "[DM] temp table erase_interm_result_info", K(ret), K(key_), K_(trace_id)); return ret; diff --git a/src/share/rc/ob_tenant_base.h b/src/share/rc/ob_tenant_base.h index 9e2f3feaf3..588753b177 100755 --- a/src/share/rc/ob_tenant_base.h +++ b/src/share/rc/ob_tenant_base.h @@ -46,7 +46,10 @@ namespace obmysql { class ObSqlNioServer; } namespace sql { - namespace dtl { class ObTenantDfc; } + namespace dtl { + class ObTenantDfc; + class ObDTLIntermResultManager; + } class ObTenantSQLSessionMgr; class ObTenantSqlMemoryManager; class ObPlanMonitorNodeList; @@ -277,6 +280,7 @@ using ObTableScanIteratorObjPool = common::ObServerObjectPoolseq_no() = 1; buffer->pos() = 0; if (use_interm_result_) { - if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(buffer, id_))) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(buffer, id_))) { LOG_WARN("fail to process internal result", K(ret)); } } else { @@ -663,30 +663,32 @@ int ObDtlBasicChannel::process1( ObDTLIntermResultKey key; key.channel_id_ = id_; key.batch_id_ = batch_id_; - if (channel_is_eof_) { - ret = OB_EAGAIN; - } else if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_get_interm_result_info( - key, result_info_guard_))) { - if (is_px_channel()) { + MTL_SWITCH(tenant_id_) { + if (channel_is_eof_) { ret = OB_EAGAIN; - } else if (ignore_error()) { - ret = OB_SUCCESS; + } else if (OB_FAIL(MTL(ObDTLIntermResultManager*)->atomic_get_interm_result_info( + key, result_info_guard_))) { + if (is_px_channel()) { + ret = OB_EAGAIN; + } else if (ignore_error()) { + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to get row store", K(ret)); + } + LOG_TRACE("fail to get row store", K(ret), K(key.batch_id_), K(key.channel_id_)); + } else if (FALSE_IT(result_info = result_info_guard_.result_info_)) { + } else if (OB_SUCCESS != result_info->ret_) { + ret = result_info->ret_; + LOG_WARN("the interm result info meet a error", K(ret)); + } else if (!result_info->is_store_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("there is no row store in internal result", K(ret)); + } else if (OB_FAIL(DTL_IR_STORE_DO(*result_info, finish_add_row, true))) { + LOG_WARN("failed to finish add row", K(ret)); } else { - LOG_WARN("fail to get row store", K(ret)); - } - LOG_TRACE("fail to get row store", K(ret), K(key.batch_id_), K(key.channel_id_)); - } else if (FALSE_IT(result_info = result_info_guard_.result_info_)) { - } else if (OB_SUCCESS != result_info->ret_) { - ret = result_info->ret_; - LOG_WARN("the interm result info meet a error", K(ret)); - } else if (!result_info->is_store_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("there is no row store in internal result", K(ret)); - } else if (OB_FAIL(DTL_IR_STORE_DO(*result_info, finish_add_row, true))) { - LOG_WARN("failed to finish add row", K(ret)); - } else { - if (OB_FAIL(result_info->datum_store_->begin(datum_iter_))) { - LOG_WARN("begin iterator failed", K(ret)); + if (OB_FAIL(result_info->datum_store_->begin(datum_iter_))) { + LOG_WARN("begin iterator failed", K(ret)); + } } } if (OB_SUCC(ret) && !channel_is_eof()) { diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index 2a8fdd7c03..6be9bbfed2 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -11,6 +11,9 @@ */ #define USING_LOG_PREFIX SQL_DTL +#include "observer/omt/ob_tenant.h" +#include "observer/omt/ob_multi_tenant.h" +#include "observer/ob_server_struct.h" #include "ob_dtl_interm_result_manager.h" #include "storage/blocksstable/ob_tmp_file.h" #include "observer/virtual_table/ob_all_virtual_dtl_interm_result_monitor.h" @@ -23,45 +26,6 @@ using namespace common; using namespace sql; using namespace oceanbase::sql::dtl; - - -void ObDTLIntermResultGC::runTimerTask() -{ - int ret = OB_SUCCESS; - cur_time_ = oceanbase::common::ObTimeUtility::current_time(); - expire_keys_.reset(); - - // 之前放到了foreach里面,也就是每个元素会++一次,那这样, - // 如果缓存是一个比较大的质数,则可能需要好久,而且每次都在变,所以可能基本上走不进去,bigbig bug - ++dump_count_; - interm_cnt_ = 0; - // dump 每隔10秒 && 未超时 && 未使用 的row_store - if (OB_SUCC(ret)) { - if (OB_FAIL(ObDTLIntermResultManager::getInstance().dump_result_info(*this))) { - LOG_WARN("fail to for each row store", K(ret)); - } else { - int64_t dump_cost = oceanbase::common::ObTimeUtility::current_time() - cur_time_; - LOG_INFO("dump dtl interm result cost(us)", K(dump_cost), K(ret), - "interm count", interm_cnt_, "dump count", dump_count_); - } - } - - clean_cnt_ = 0; - interm_cnt_ = 0; - cur_time_ = oceanbase::common::ObTimeUtility::current_time(); - // 清理超时row_store - if (OB_SUCC(ret)) { - if (OB_FAIL(ObDTLIntermResultManager::getInstance().clear_timeout_result_info(*this))) { - LOG_WARN("fail to for each row store", K(ret)); - } else { - int64_t clear_cost = oceanbase::common::ObTimeUtility::current_time() - cur_time_; - LOG_INFO("clear dtl interm result cost(us)", K(clear_cost), K(ret), - K(expire_keys_.count()), "dump count", dump_count_, - "interm count", interm_cnt_, "clean count", clean_cnt_); - } - } -} - void ObDTLIntermResultGC::reset() { expire_keys_.reset(); @@ -77,9 +41,6 @@ int ObDTLIntermResultGC::operator() (common::hash::HashMapPairret_ && cur_time_ - entry.first.start_time_ > DUMP_TIME_THRESHOLD && dis < 0) { - if (NULL != tenant_guard_) { - tenant_guard_->switch_to(entry.second->datum_store_->get_tenant_id()); - } int64_t dump_time = oceanbase::common::ObTimeUtility::current_time(); if (OB_FAIL(DTL_IR_STORE_DO(*entry.second, dump, false, true))) { LOG_WARN("fail to dump interm row store", K(ret)); @@ -95,7 +56,7 @@ int ObDTLIntermResultGC::operator() (common::hash::HashMapPairret_ = ret; // free interm result info datum store in advance for memory optimization. - ObDTLIntermResultManager::getInstance().free_interm_result_info_store(entry.second); + ObDTLIntermResultManager::free_interm_result_info_store(entry.second); } } @@ -166,50 +127,51 @@ void ObAtomicAppendPartBlockCall::operator() (common::hash::HashMapPair &entry) -{ - int ret = OB_SUCCESS; - if (entry.second->tenant_id_ == tenant_id_) { - if (OB_FAIL(expire_keys_.push_back(entry.first))) { - LOG_WARN("push back failed", K(ret)); - ret_ = ret; - } - } - return OB_SUCCESS; -} - -ObDTLIntermResultManager &ObDTLIntermResultManager::getInstance() -{ - static ObDTLIntermResultManager the_ir_manager; - return the_ir_manager; -} - - int ObDTLIntermResultManager::init() { int ret = OB_SUCCESS; - auto attr = SET_USE_500("HashBuckDTLINT"); + uint64_t tenant_id = MTL_ID(); + ObMemAttr attr(tenant_id, "HashBuckDTLINT"); + int64_t tenant_mem_limit = lib::get_tenant_memory_limit(tenant_id); + double mem_factor = static_cast(tenant_mem_limit) / lib::get_memory_limit(); + // less memory for meta tenant + if (is_meta_tenant(tenant_id)) { + mem_factor = mem_factor * 0.01; + } + if (IS_INIT) { ret = OB_INIT_TWICE; - } else if (OB_FAIL(map_.create(BUCKET_NUM, + } else if (OB_FAIL(map_.create(static_cast(DEFAULT_BUCKET_NUM * mem_factor), attr, attr))) { LOG_WARN("create hash table failed", K(ret)); - } else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::ServerGTimer, gc_, - ObDTLIntermResultGC::REFRESH_INTERVAL, true))) { - LOG_WARN("schedule interm result gc failed", K(ret)); } else { is_inited_ = true; } return ret; } +int ObDTLIntermResultManager::mtl_init(ObDTLIntermResultManager *&dtl_interm_result_manager) +{ + return dtl_interm_result_manager->init(); +} + void ObDTLIntermResultManager::destroy() { if (IS_INIT) { + erase_tenant_interm_result_info(); map_.destroy(); } } +void ObDTLIntermResultManager::mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager) +{ + if (nullptr != dtl_interm_result_manager) { + dtl_interm_result_manager->destroy(); + ob_delete(dtl_interm_result_manager); + dtl_interm_result_manager = nullptr; + } +} + int ObDTLIntermResultManager::get_interm_result_info(ObDTLIntermResultKey &key, ObDTLIntermResultInfo &result_info) { @@ -228,20 +190,17 @@ int ObDTLIntermResultManager::create_interm_result_info(ObMemAttr &attr, const ObDTLIntermResultMonitorInfo &monitor_info) { int ret = OB_SUCCESS; - void *ptr = NULL; - ObDTLIntermResultInfo *result_info = NULL; void *result_info_buf = NULL; - const int64_t size = sizeof(ObChunkDatumStore); - if (OB_ISNULL(result_info_buf = - static_cast(ob_malloc(sizeof(ObDTLIntermResultInfo), attr)))) { + void *datum_store_buf = NULL; + if (OB_ISNULL(result_info_buf = ob_malloc(sizeof(ObDTLIntermResultInfo), attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc dtl interm result info", K(ret)); - } else if (OB_ISNULL(ptr = ob_malloc(size, attr))) { + } else if (OB_ISNULL(datum_store_buf = ob_malloc(sizeof(ObChunkDatumStore), attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to alloc ob chunk row store ret", K(ret)); + LOG_WARN("fail to alloc ob chunk datum store", K(ret)); } else { - result_info = new(result_info_buf) ObDTLIntermResultInfo(); - result_info->datum_store_ = new(ptr) ObChunkDatumStore("DtlIntermRes"); + ObDTLIntermResultInfo *result_info = new(result_info_buf) ObDTLIntermResultInfo(); + result_info->datum_store_ = new(datum_store_buf) ObChunkDatumStore("DtlIntermRes"); result_info->is_read_ = false; result_info->trace_id_ = *ObCurTraceId::get_trace_id(); result_info->monitor_info_ = monitor_info; @@ -252,8 +211,8 @@ int ObDTLIntermResultManager::create_interm_result_info(ObMemAttr &attr, if (NULL != result_info_buf) { ob_free(result_info_buf); } - if (NULL != ptr) { - ob_free(ptr); + if (NULL != datum_store_buf) { + ob_free(datum_store_buf); } } return ret; @@ -327,7 +286,7 @@ int ObDTLIntermResultManager::clear_timeout_result_info(ObDTLIntermResultGC &gc) if (OB_SUCC(ret)) { for (int i = 0; i < gc.expire_keys_.count(); ++i) { ObDTLIntermResultKey &key = gc.expire_keys_.at(i); - if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_FAIL(erase_interm_result_info(key))) { LOG_WARN("fail to erase row store", K(key), K(ret)); } } @@ -339,13 +298,10 @@ int ObDTLIntermResultManager::clear_timeout_result_info(ObDTLIntermResultGC &gc) int ObDTLIntermResultManager::dump_result_info(ObDTLIntermResultGC &gc) { int ret = OB_SUCCESS; - MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); gc.gc_type_ = ObDTLIntermResultGC::DUMP; - gc.tenant_guard_ = &guard; if (OB_FAIL(map_.foreach_refactored(gc))) { LOG_WARN("fail to get row store in result manager", K(ret)); } - gc.tenant_guard_ = NULL; return ret; } @@ -408,31 +364,23 @@ int ObDTLIntermResultManager::generate_monitor_info_rows(observer::ObDTLIntermRe return ret; } -int ObDTLIntermResultManager::erase_tenant_interm_result_info(int64_t tenant_id) +int ObDTLIntermResultManager::erase_tenant_interm_result_info() { int ret = OB_SUCCESS; - ObEraseTenantIntermResultInfo eraser; - eraser.tenant_id_ = tenant_id; - if (OB_FAIL(map_.foreach_refactored(eraser))) { - LOG_WARN("fail to get tenant result info in result manager", K(ret), K(tenant_id)); - } else { - ret = eraser.ret_; - for (int i = 0; i < eraser.expire_keys_.count(); ++i) { - ObDTLIntermResultKey &key = eraser.expire_keys_.at(i); - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { - if (OB_HASH_NOT_EXIST != tmp_ret) { - LOG_WARN("fail to erase result info", K(key), K(ret)); - ret = tmp_ret; - } + for (auto iter = map_.begin(); iter != map_.end(); ++iter) { + ObDTLIntermResultKey &key = iter->first; + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) { + if (OB_HASH_NOT_EXIST != tmp_ret) { + LOG_WARN("fail to erase result info", K(key), K(tmp_ret)); + ret = tmp_ret; } } - if (eraser.expire_keys_.count() < 100) { - LOG_INFO("erase_tenant_interm_result_info", K(tenant_id), K(eraser.expire_keys_)); - } else { - LOG_INFO("erase_tenant_interm_result_info", K(tenant_id), K(eraser.expire_keys_.count())); - } } + if (OB_SUCC(ret)) { + LOG_INFO("erase_tenant_interm_result_info", K(MTL_ID()), K(map_.size())); + } + return ret; } @@ -489,14 +437,13 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf { int ret = OB_SUCCESS; ObDTLIntermResultInfo result_info; - if (OB_FAIL(ObDTLIntermResultManager::getInstance().get_interm_result_info(key, - result_info))) { + if (OB_FAIL(get_interm_result_info(key, result_info))) { if (OB_HASH_NOT_EXIST == ret) { ObDTLIntermResultInfoGuard result_info_guard; ObMemAttr attr(buffer.tenant_id(), "DtlIntermRes", common::ObCtxIds::EXECUTE_CTX_ID); key.start_time_ = oceanbase::common::ObTimeUtility::current_time(); ret = OB_SUCCESS; - if (OB_FAIL(ObDTLIntermResultManager::getInstance().create_interm_result_info(attr, + if (OB_FAIL(create_interm_result_info(attr, result_info_guard, ObDTLIntermResultMonitorInfo(buffer.get_dfo_key().qc_id_, buffer.get_dfo_id(), buffer.get_sqc_id())))) { @@ -505,7 +452,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf *result_info_guard.result_info_, init, 0, buffer.tenant_id(), common::ObCtxIds::EXECUTE_CTX_ID, "DtlIntermRes"))) { LOG_WARN("fail to init buffer", K(ret)); - } else if (OB_FAIL(ObDTLIntermResultManager::getInstance().insert_interm_result_info(key, result_info_guard.result_info_))) { + } else if (OB_FAIL(insert_interm_result_info(key, result_info_guard.result_info_))) { LOG_WARN("fail to insert row store", K(ret)); } else { int reg_dm_ret = ObDetectManagerUtils::single_dfo_register_check_item_into_dm( @@ -526,7 +473,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf LOG_WARN("out of index", K(ret), K(buffer.size()), K(start_pos), K(length)); } else if (append_whole_block) { ObAtomicAppendBlockCall call(buffer.buf(), length, is_eof); - if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_append_block(key, call))) { + if (OB_FAIL(atomic_append_block(key, call))) { if (OB_HASH_NOT_EXIST == ret && oceanbase::common::ObTimeUtility::current_time() > key.time_us_) { ret = OB_TIMEOUT; @@ -540,7 +487,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf } } else { ObAtomicAppendPartBlockCall call(buffer.buf(), start_pos, length, rows, is_eof); - if (OB_FAIL(ObDTLIntermResultManager::getInstance().atomic_append_part_block(key, call))) { + if (OB_FAIL(atomic_append_part_block(key, call))) { if (OB_HASH_NOT_EXIST == ret && oceanbase::common::ObTimeUtility::current_time() > key.time_us_) { ret = OB_TIMEOUT; @@ -559,7 +506,7 @@ int ObDTLIntermResultManager::process_interm_result_inner(ObDtlLinkedBuffer &buf // 注意这里理论上也不会有并发问题,因为channel是点对点且串行发送的 // 所以这个接收到了,肯定没有其他线程给这个channel发送 // 尝试先从hash table中释放(尽早释放内存,其实也可以让timer来清理) - ObDTLIntermResultManager::getInstance().erase_interm_result_info(key); + erase_interm_result_info(key); } return ret; } @@ -585,6 +532,46 @@ void ObDTLIntermResultManager::dec_interm_result_ref_count(ObDTLIntermResultInfo } } +void ObDTLIntermResultManager::runTimerTask() +{ + int ret = OB_SUCCESS; + gc_.cur_time_ = oceanbase::common::ObTimeUtility::current_time(); + gc_.expire_keys_.reset(); + + // Previously, it was placed inside a foreach loop, + // which means that each element would ++gc_.dump_count_. + // In this case, if the cache is a relatively large prime number, + // it may take a long time and it keeps changing, + // so it is unlikely to enter the loop, causing a big bug. + ++gc_.dump_count_; + gc_.interm_cnt_ = 0; + // dump every_10_seconds && not_expired && unused row_store + if (OB_SUCC(ret)) { + if (OB_FAIL(dump_result_info(gc_))) { + LOG_WARN("fail to for each row store", K(ret)); + } else { + int64_t dump_cost = oceanbase::common::ObTimeUtility::current_time() - gc_.cur_time_; + LOG_INFO("dump dtl interm result cost(us)", K(dump_cost), K(ret), + "interm count", gc_.interm_cnt_, "dump count", gc_.dump_count_); + } + } + + gc_.clean_cnt_ = 0; + gc_.interm_cnt_ = 0; + gc_.cur_time_ = oceanbase::common::ObTimeUtility::current_time(); + // Cleaning up expired row_store + if (OB_SUCC(ret)) { + if (OB_FAIL(clear_timeout_result_info(gc_))) { + LOG_WARN("fail to for each row store", K(ret)); + } else { + int64_t clear_cost = oceanbase::common::ObTimeUtility::current_time() - gc_.cur_time_; + LOG_INFO("clear dtl interm result cost(us)", K(clear_cost), K(ret), + K(gc_.expire_keys_.count()), "dump count", gc_.dump_count_, + "interm count", gc_.interm_cnt_, "clean count", gc_.clean_cnt_); + } + } +} + void ObDTLIntermResultInfoGuard::set_result_info(ObDTLIntermResultInfo &result_info) { if (&result_info != result_info_) { @@ -601,3 +588,37 @@ void ObDTLIntermResultInfoGuard::reset() result_info_ = NULL; } } + +int ObDTLIntermResultManager::mtl_start(ObDTLIntermResultManager *&dtl_interm_result_manager) +{ + int ret = OB_SUCCESS; + if (OB_LIKELY(nullptr != dtl_interm_result_manager)) { + if (OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task(), + ObDTLIntermResultGCTask::REFRESH_INTERVAL, true))) { + LOG_WARN("failed to scheduler flush all task", K(ret)); + } else { + dtl_interm_result_manager->get_gc_task().disable_timeout_check(); + dtl_interm_result_manager->get_gc_task().dtl_interm_result_manager_ = dtl_interm_result_manager; + } + } + return ret; +} + +void ObDTLIntermResultManager::mtl_stop(ObDTLIntermResultManager *&dtl_interm_result_manager) +{ + if (OB_LIKELY(nullptr != dtl_interm_result_manager)) { + TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task()); + } +} + +void ObDTLIntermResultManager::mtl_wait(ObDTLIntermResultManager *&dtl_interm_result_manager) +{ + if (OB_LIKELY(nullptr != dtl_interm_result_manager)) { + TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), dtl_interm_result_manager->get_gc_task()); + } +} + +void ObDTLIntermResultGCTask::runTimerTask() +{ + dtl_interm_result_manager_->runTimerTask(); +} \ No newline at end of file diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.h b/src/sql/dtl/ob_dtl_interm_result_manager.h index cd3d7f8906..a1c1e3d512 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.h +++ b/src/sql/dtl/ob_dtl_interm_result_manager.h @@ -41,7 +41,7 @@ struct ObDTLIntermResultMonitorInfo ObDTLIntermResultMonitorInfo(int64_t qc_id, int64_t dfo_id, int64_t sqc_id) : qc_id_(qc_id), dfo_id_(dfo_id), sqc_id_(sqc_id) { } - TO_STRING_KV(K_(qc_id), K_(dfo_id), K_(sqc_id)); + TO_STRING_KV(K_(qc_id), K_(dfo_id), K_(sqc_id)); int64_t qc_id_; int64_t dfo_id_; int64_t sqc_id_; @@ -50,7 +50,7 @@ struct ObDTLIntermResultMonitorInfo class ObDtlLinkedBuffer; struct ObDTLIntermResultKey { - ObDTLIntermResultKey() : channel_id_(0), time_us_(0), + ObDTLIntermResultKey() : channel_id_(0), time_us_(0), start_time_(0), batch_id_(0) {} int64_t channel_id_; int64_t time_us_; @@ -127,21 +127,19 @@ struct ObDTLIntermResultInfoGuard #define DTL_IR_STORE_DO(ir, act, ...) \ ((ir).datum_store_->act(__VA_ARGS__)) -class ObDTLIntermResultGC : public common::ObTimerTask +class ObDTLIntermResultGC { friend class ObDTLIntermResultManager; public: ObDTLIntermResultGC() : cur_time_(0), expire_keys_(), gc_type_(NOT_INIT), dump_count_(0), - interm_cnt_(0), clean_cnt_(0), tenant_guard_(nullptr) + interm_cnt_(0), clean_cnt_(0) {} virtual ~ObDTLIntermResultGC() = default; void reset(); int operator() (common::hash::HashMapPair &entry); - void runTimerTask(); public: - const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L; // 10秒间隔 const static int64_t DUMP_TIME_THRESHOLD = 10 * 1000L * 1000L; // 超过10秒dump const static int64_t CLEAR_TIME_THRESHOLD = 10 * 1000L * 1000L; // 超过10秒清理 public: @@ -159,10 +157,8 @@ private: int64_t dump_count_; int64_t interm_cnt_; int64_t clean_cnt_; - share::ObTenantSwitchGuard *tenant_guard_; }; - class ObAtomicGetIntermResultInfoCall { public: @@ -210,26 +206,22 @@ public: bool is_eof_; }; -class ObEraseTenantIntermResultInfo +class ObDTLIntermResultGCTask : public common::ObTimerTask { public: - ObEraseTenantIntermResultInfo() : tenant_id_(OB_INVALID_ID), expire_keys_(), ret_(common::OB_SUCCESS) {} - ~ObEraseTenantIntermResultInfo() = default; - int operator() (common::hash::HashMapPair &entry); -public: - uint64_t tenant_id_; - common::ObSEArray expire_keys_; - int ret_; + ObDTLIntermResultGCTask() : dtl_interm_result_manager_(NULL) {} + virtual ~ObDTLIntermResultGCTask() {} + virtual void runTimerTask() override; + const static int64_t REFRESH_INTERVAL = 10 * 1000L * 1000L; // 10秒间隔 + ObDTLIntermResultManager *dtl_interm_result_manager_; }; class ObDTLIntermResultManager { - friend class ObDTLIntermResultGC; + friend class ObDTLIntermResultGCTask; public: - static ObDTLIntermResultManager &getInstance(); - static int process_interm_result(ObDtlLinkedBuffer *buffer, int64_t channel_id); - static int process_interm_result_inner(ObDtlLinkedBuffer &buffer, + int process_interm_result(ObDtlLinkedBuffer *buffer, int64_t channel_id); + int process_interm_result_inner(ObDtlLinkedBuffer &buffer, ObDTLIntermResultKey &key, int64_t start_pos, int64_t length, @@ -252,24 +244,34 @@ public: int atomic_append_block(ObDTLIntermResultKey &key, ObAtomicAppendBlockCall &call); int atomic_append_part_block(ObDTLIntermResultKey &key, ObAtomicAppendPartBlockCall &call); int init(); + static int mtl_init(ObDTLIntermResultManager* &dtl_interm_result_manager); void destroy(); + static void mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager); int generate_monitor_info_rows(observer::ObDTLIntermResultMonitorInfoGetter &monitor_info_getter); - int erase_tenant_interm_result_info(int64_t tenant_id); + int erase_tenant_interm_result_info(); static void free_interm_result_info_store(ObDTLIntermResultInfo *result_info); static void free_interm_result_info(ObDTLIntermResultInfo *result_info); static void inc_interm_result_ref_count(ObDTLIntermResultInfo *result_info); static void dec_interm_result_ref_count(ObDTLIntermResultInfo *&result_info); + void runTimerTask(); + static int mtl_start(ObDTLIntermResultManager *&dtl_interm_result_manager); + static void mtl_stop(ObDTLIntermResultManager *&dtl_interm_result_manager); + static void mtl_wait(ObDTLIntermResultManager *&dtl_interm_result_manager); + ObDTLIntermResultGCTask &get_gc_task() { return gc_task_; } + + ObDTLIntermResultManager(); + ~ObDTLIntermResultManager(); private: - // 由于此中间结果管理器是全局结构, 基于性能考虑,减少锁冲突设置bucket_num为50w. - static const int64_t BUCKET_NUM = 500000; //50w + // 由于此中间结果管理器是全局结构, 基于性能考虑, 减少锁冲突设置bucket_num为50w. + static const int64_t DEFAULT_BUCKET_NUM = 500000; //50w + static const int64_t MAX_TENANT_MEM_LIMIT = 17179869184; //16G private: MAP map_; bool is_inited_; int64_t dir_id_; ObDTLIntermResultGC gc_; + ObDTLIntermResultGCTask gc_task_; private: - ObDTLIntermResultManager(); - ~ObDTLIntermResultManager(); DISALLOW_COPY_AND_ASSIGN(ObDTLIntermResultManager); }; diff --git a/src/sql/dtl/ob_dtl_local_channel.cpp b/src/sql/dtl/ob_dtl_local_channel.cpp index b6f0ac893c..6e8ea984d1 100644 --- a/src/sql/dtl/ob_dtl_local_channel.cpp +++ b/src/sql/dtl/ob_dtl_local_channel.cpp @@ -81,8 +81,10 @@ int ObDtlLocalChannel::send_shared_message(ObDtlLinkedBuffer *&buf) is_first = buf->is_data_msg() && 1 == buf->seq_no(); is_eof = buf->is_eof(); if (buf->is_data_msg() && buf->use_interm_result()) { - if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(buf, peer_id_))) { - LOG_WARN("fail to process internal result", K(ret)); + MTL_SWITCH(buf->tenant_id()) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(buf, peer_id_))) { + LOG_WARN("fail to process internal result", K(ret)); + } } } else if (OB_FAIL(DTL.get_channel(peer_id_, chan))) { int tmp_ret = ret; diff --git a/src/sql/dtl/ob_dtl_rpc_processor.cpp b/src/sql/dtl/ob_dtl_rpc_processor.cpp index 16d0f54432..edb06770ad 100644 --- a/src/sql/dtl/ob_dtl_rpc_processor.cpp +++ b/src/sql/dtl/ob_dtl_rpc_processor.cpp @@ -42,8 +42,10 @@ int ObDtlSendMessageP::process_msg(ObDtlRpcDataResponse &response, ObDtlSendArgs ObDtlChannel *chan = nullptr; response.is_block_ = false; if (arg.buffer_.is_data_msg() && arg.buffer_.use_interm_result()) { - if (OB_FAIL(ObDTLIntermResultManager::process_interm_result(&arg.buffer_, arg.chid_))) { - LOG_WARN("fail to process internal result", K(ret)); + MTL_SWITCH(arg.buffer_.tenant_id()) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->process_interm_result(&arg.buffer_, arg.chid_))) { + LOG_WARN("fail to process internal result", K(ret)); + } } } else if (OB_FAIL(DTL.get_channel(arg.chid_, chan))) { int tmp_ret = ret; @@ -164,7 +166,7 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer) ObPxBloomFilter *filter = NULL; if (OB_FAIL(ObDtlLinkedBuffer::deserialize_msg_header(*buffer, header))) { LOG_WARN("fail to decode header of buffer", K(ret)); - } + } if (OB_SUCC(ret)) { const char *buf = buffer->buf(); int64_t size = buffer->size(); @@ -172,13 +174,13 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer) if (OB_FAIL(common::serialization::decode(buf, size, pos, bf_data))) { LOG_WARN("fail to decode bloom filter data", K(ret)); } else { - ObPXBloomFilterHashWrapper bf_key(bf_data.tenant_id_, bf_data.filter_id_, + ObPXBloomFilterHashWrapper bf_key(bf_data.tenant_id_, bf_data.filter_id_, bf_data.server_id_, bf_data.px_sequence_id_, 0/*task_id*/); if (OB_FAIL(ObPxBloomFilterManager::instance().get_px_bf_for_merge_filter( bf_key, filter))) { LOG_WARN("fail to get px bloom filter", K(ret)); } - // get_px_bf_for_merge_filter只有在成功后会增加filter的引用计数 + // get_px_bf_for_merge_filter只有在成功后会增加filter的引用计数 if (OB_SUCC(ret) && OB_NOT_NULL(filter)) { if (OB_FAIL(filter->merge_filter(&bf_data.filter_))) { LOG_WARN("fail to merge filter", K(ret)); @@ -187,7 +189,7 @@ int ObDtlSendMessageP::process_px_bloom_filter_data(ObDtlLinkedBuffer *&buffer) } // merge以及process操作完成之后, 需要减少其引用计数. (void)filter->dec_merge_filter_count(); - } + } } } } diff --git a/src/sql/engine/basic/ob_temp_table_access_op.cpp b/src/sql/engine/basic/ob_temp_table_access_op.cpp index e91f7d7089..f6ce6833d5 100644 --- a/src/sql/engine/basic/ob_temp_table_access_op.cpp +++ b/src/sql/engine/basic/ob_temp_table_access_op.cpp @@ -365,7 +365,7 @@ int ObTempTableAccessOp::locate_interm_result(int64_t result_id) // The current operation of obtaining intermediate results and // the operation of the background thread of dumping intermediate results // are mutually exclusive - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().atomic_get_interm_result_info( + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->atomic_get_interm_result_info( dtl_int_key, result_info_guard_))) { LOG_WARN("failed to create row store.", K(ret)); } else if (FALSE_IT(result_info = result_info_guard_.result_info_)) { diff --git a/src/sql/engine/basic/ob_temp_table_insert_op.cpp b/src/sql/engine/basic/ob_temp_table_insert_op.cpp index 5db837b627..829fb1c658 100644 --- a/src/sql/engine/basic/ob_temp_table_insert_op.cpp +++ b/src/sql/engine/basic/ob_temp_table_insert_op.cpp @@ -260,7 +260,7 @@ int ObTempTableInsertOp::init_chunk_row_store(ObDTLIntermResultInfo *&chunk_row_ uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id(); ObMemAttr mem_attr(tenant_id, "TempTableInsert", ObCtxIds::WORK_AREA); dtl::ObDTLIntermResultInfoGuard result_info_guard; - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().create_interm_result_info( + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->create_interm_result_info( mem_attr, result_info_guard, dtl::ObDTLIntermResultMonitorInfo( @@ -305,11 +305,11 @@ int ObTempTableInsertOp::insert_chunk_row_store() if (OB_ISNULL(phy_plan_ctx = GET_PHY_PLAN_CTX(ctx_))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("phy_plan_ctx is NULL", K(ret)); - } else if (!MY_SPEC.is_distributed_ && + } else if (!MY_SPEC.is_distributed_ && all_datum_store_.empty() && OB_FAIL(init_chunk_row_store(chunk_row_store))) { //local temp table需要一个空的row store占位 - LOG_WARN("failed to init chunk row store", K(ret)); + LOG_WARN("failed to init chunk row store", K(ret)); } else if (!MY_SPEC.is_distributed_ && all_datum_store_.count() != 1) { ret = OB_ERR_UNEXPECTED; LOG_WARN("local temp table shoud have one chunk row store", K(ret)); @@ -334,12 +334,12 @@ int ObTempTableInsertOp::insert_chunk_row_store() row_store->set_eof(true); //chunk row store不需要管理dump逻辑 row_store->is_read_ = true; - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().insert_interm_result_info( + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->insert_interm_result_info( dtl_int_key, row_store))) { LOG_WARN("failed to insert row store.", K(ret), K(dtl_int_key.channel_id_)); } else if (OB_FAIL(keys_insert.push_back(dtl_int_key))) { LOG_WARN("failed to push back key", K(ret)); - dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(dtl_int_key); + MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(dtl_int_key); } else { row_store->datum_store_->reset_callback(); ObPxSqcHandler *handler = ctx_.get_sqc_handler(); @@ -361,7 +361,7 @@ int ObTempTableInsertOp::insert_chunk_row_store() if (OB_FAIL(ret)) { //异常处理 for (int64_t i = 0; i < keys_insert.count(); ++i) { - dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(keys_insert.at(i)); + MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(keys_insert.at(i)); } } else { clear_all_datum_store(); diff --git a/src/sql/engine/basic/ob_temp_table_transformation_op.cpp b/src/sql/engine/basic/ob_temp_table_transformation_op.cpp index 33d897dac1..60a7484cd2 100644 --- a/src/sql/engine/basic/ob_temp_table_transformation_op.cpp +++ b/src/sql/engine/basic/ob_temp_table_transformation_op.cpp @@ -240,7 +240,7 @@ int ObTempTableTransformationOp::destory_local_interm_results(ObIArray LOG_TRACE("destory interm results", K(get_exec_ctx().get_addr()), K(result_ids)); for (int64_t i = 0; OB_SUCC(ret) && i < result_ids.count(); ++i) { dtl_int_key.channel_id_ = result_ids.at(i); - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info( + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info( dtl_int_key))) { if (OB_HASH_NOT_EXIST == ret) { ret = OB_SUCCESS; diff --git a/src/sql/engine/px/exchange/ob_px_receive_op.cpp b/src/sql/engine/px/exchange/ob_px_receive_op.cpp index cda0d5d7fa..599e176a19 100644 --- a/src/sql/engine/px/exchange/ob_px_receive_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_receive_op.cpp @@ -382,7 +382,7 @@ int ObPxReceiveOp::inner_rescan() channel->reset_state(); channel->set_batch_id(ctx_.get_px_batch_id()); channel->reset_px_row_iterator(); - release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key); + release_channel_ret = MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key); if (release_channel_ret != common::OB_SUCCESS) { LOG_WARN("fail to release recieve internal result", KR(release_channel_ret), K(ret)); } @@ -541,7 +541,7 @@ int ObPxReceiveOp::erase_dtl_interm_result() for (int64_t batch_id = ctx_.get_px_batch_id(); batch_id < PX_RESCAN_BATCH_ROW_COUNT && OB_SUCC(ret); batch_id++) { key.batch_id_ = batch_id; - if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key))) { if (OB_HASH_NOT_EXIST == ret) { ret = OB_SUCCESS; break; diff --git a/src/sql/engine/px/ob_px_coord_op.cpp b/src/sql/engine/px/ob_px_coord_op.cpp index 09e3e655dd..e4fccaf647 100644 --- a/src/sql/engine/px/ob_px_coord_op.cpp +++ b/src/sql/engine/px/ob_px_coord_op.cpp @@ -1121,7 +1121,7 @@ int ObPxCoordOp::erase_dtl_interm_result() key.channel_id_ = ci.chid_; for (int j = 0; j < last_px_batch_rescan_size_; ++j) { key.batch_id_ = j; - if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_FAIL(MTL(ObDTLIntermResultManager*)->erase_interm_result_info(key))) { LOG_TRACE("fail to release recieve internal result", K(ret)); } } diff --git a/src/sql/engine/px/ob_px_rpc_processor.cpp b/src/sql/engine/px/ob_px_rpc_processor.cpp index 6957ef5dc0..7c9f6b1ef6 100644 --- a/src/sql/engine/px/ob_px_rpc_processor.cpp +++ b/src/sql/engine/px/ob_px_rpc_processor.cpp @@ -660,7 +660,7 @@ int ObPxCleanDtlIntermResP::process() key.channel_id_ = ch_set.get_ch_info_set().at(ch_idx).chid_; for (int64_t batch_id = 0; batch_id < batch_size && OB_SUCC(ret); batch_id++) { key.batch_id_= batch_id; - if (OB_FAIL(dtl::ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) { + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key))) { if (OB_HASH_NOT_EXIST == ret) { // interm result is written from batch_id = 0 to batch_size, // if some errors happen when batch_id = i, no interm result of batch_id > i will be written. diff --git a/src/sql/executor/ob_executor_rpc_processor.cpp b/src/sql/executor/ob_executor_rpc_processor.cpp index fb85762448..a518a7926d 100644 --- a/src/sql/executor/ob_executor_rpc_processor.cpp +++ b/src/sql/executor/ob_executor_rpc_processor.cpp @@ -68,11 +68,10 @@ int ObRpcEraseIntermResultP::process() int ret = OB_SUCCESS; LOG_TRACE("receive erase interm result request", K(arg_)); dtl::ObDTLIntermResultKey dtl_int_key; - dtl::ObDTLIntermResultManager &mgr = dtl::ObDTLIntermResultManager::getInstance(); ObIArray &interm_result_ids = arg_.interm_result_ids_; for (int64_t i = 0; OB_SUCC(ret) && i < interm_result_ids.count(); ++i) { dtl_int_key.channel_id_ = interm_result_ids.at(i); - if (OB_FAIL(mgr.erase_interm_result_info(dtl_int_key))) { + if (OB_FAIL(MTL(dtl::ObDTLIntermResultManager*)->erase_interm_result_info(dtl_int_key))) { LOG_WARN("failed to erase interm result info in manager.", K(ret)); } }