diff --git a/deps/oblib/src/lib/utility/ob_tracepoint.h b/deps/oblib/src/lib/utility/ob_tracepoint.h index 38a857ee3..fe4aff68e 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint.h @@ -788,6 +788,7 @@ class EventTable EN_ENABLE_TABLE_LOCK = 2102, EN_ENABLE_ROWKEY_CONFLICT_CHECK = 2103, EN_ENABLE_ORA_DECINT_CONST = 2104, + EN_ENABLE_CLEAN_INTERM_RES = 2105, // WR && ASH EN_CLOSE_ASH = 2201, diff --git a/src/share/detect/ob_detect_callback.cpp b/src/share/detect/ob_detect_callback.cpp index 17e764695..3abcd5d3d 100644 --- a/src/share/detect/ob_detect_callback.cpp +++ b/src/share/detect/ob_detect_callback.cpp @@ -259,9 +259,12 @@ int ObSqcDetectCB::do_callback() int ObSingleDfoDetectCB::do_callback() { int ret = OB_SUCCESS; - ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false); - ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret; - LIB_LOG(WARN, "[DM] single dfo erase_interm_result_info", K(ret), K(key_), K_(trace_id)); + int clean_ret = OB_E(EventTable::EN_ENABLE_CLEAN_INTERM_RES) OB_SUCCESS; + if (OB_SUCC(clean_ret)) { + ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false); + ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret; + LIB_LOG(WARN, "[DM] single dfo erase_interm_result_info", K(ret), K(key_), K_(trace_id)); + } return ret; } diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index 625ef098f..16b4a2459 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -20,6 +20,7 @@ #include "sql/dtl/ob_dtl_linked_buffer.h" #include "sql/dtl/ob_dtl_msg_type.h" #include "share/detect/ob_detect_manager_utils.h" +#include "deps/oblib/src/lib/hash/ob_hashtable.h" using namespace oceanbase; using namespace common; @@ -168,7 +169,6 @@ void ObDTLIntermResultManager::destroy() void ObDTLIntermResultManager::mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager) { if (nullptr != dtl_interm_result_manager) { - dtl_interm_result_manager->destroy(); ob_delete(dtl_interm_result_manager); dtl_interm_result_manager = nullptr; } @@ -262,7 +262,7 @@ void ObDTLIntermResultManager::free_interm_result_info(ObDTLIntermResultInfo *re } } -int ObDTLIntermResultManager::erase_interm_result_info(ObDTLIntermResultKey &key, +int ObDTLIntermResultManager::erase_interm_result_info(const ObDTLIntermResultKey &key, bool need_unregister_check_item_from_dm) { int ret = OB_SUCCESS; @@ -369,20 +369,27 @@ int ObDTLIntermResultManager::generate_monitor_info_rows(observer::ObDTLIntermRe int ObDTLIntermResultManager::erase_tenant_interm_result_info() { int ret = OB_SUCCESS; - for (auto iter = map_.begin(); iter != map_.end(); ++iter) { - ObDTLIntermResultKey &key = iter->first; - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) { - if (OB_HASH_NOT_EXIST != tmp_ret) { - LOG_WARN("fail to erase result info", K(key), K(tmp_ret)); - ret = tmp_ret; + MAP::bucket_iterator bucket_it = map_.bucket_begin(); + while (bucket_it != map_.bucket_end()) { + MAP::hashtable::bucket_lock_cond blc(*bucket_it); + MAP::hashtable::writelocker locker(blc.lock()); + MAP::hashtable::hashbucket::const_iterator node_it = bucket_it->node_begin(); + while (node_it != bucket_it->node_end()) { + int tmp_ret = OB_SUCCESS; + const ObDTLIntermResultKey &key = node_it->first; + node_it++; + if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) { + if (OB_HASH_NOT_EXIST != tmp_ret) { + LOG_WARN("fail to erase result info", K(key), K(tmp_ret)); + ret = tmp_ret; + } } } + bucket_it++; } if (OB_SUCC(ret)) { LOG_INFO("erase_tenant_interm_result_info", K(MTL_ID()), K(map_.size())); } - return ret; } diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.h b/src/sql/dtl/ob_dtl_interm_result_manager.h index a1c1e3d51..b07320b79 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.h +++ b/src/sql/dtl/ob_dtl_interm_result_manager.h @@ -232,7 +232,7 @@ public: int get_interm_result_info(ObDTLIntermResultKey &key, ObDTLIntermResultInfo &result_info); int create_interm_result_info(ObMemAttr &attr, ObDTLIntermResultInfoGuard &result_info_guard, const ObDTLIntermResultMonitorInfo &monitor_info); - int erase_interm_result_info(ObDTLIntermResultKey &key, bool need_unregister_check_item_from_dm=true); + int erase_interm_result_info(const ObDTLIntermResultKey &key, bool need_unregister_check_item_from_dm=true); int insert_interm_result_info(ObDTLIntermResultKey &key, ObDTLIntermResultInfo *&result_info); // 以下两个接口会持有bucket读锁. int clear_timeout_result_info(ObDTLIntermResultGC &gc); diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index f86455436..e75308f19 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -627,7 +627,10 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx) int ret = OB_SUCCESS; const ObIArray &all_dfos = coord_info_.dfo_mgr_.get_all_dfos(); ObDfo *last_dfo = all_dfos.at(all_dfos.count() - 1); - if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent()) + int clean_ret = OB_E(EventTable::EN_ENABLE_CLEAN_INTERM_RES) OB_SUCCESS; + if (clean_ret != OB_SUCCESS) { + // Fault injection: Do not clean up interm results. + } else if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent()) && last_dfo->parent()->is_root_dfo()) { // all dfo scheduled, do nothing. LOG_TRACE("all dfo scheduled.");