Fix the implementation issue with deleting all intermediate results for a tenant.

This commit is contained in:
obdev 2024-02-08 02:57:04 +00:00 committed by ob-robot
parent 4e536c6326
commit 7448206eb9
5 changed files with 29 additions and 15 deletions

View File

@ -788,6 +788,7 @@ class EventTable
EN_ENABLE_TABLE_LOCK = 2102,
EN_ENABLE_ROWKEY_CONFLICT_CHECK = 2103,
EN_ENABLE_ORA_DECINT_CONST = 2104,
EN_ENABLE_CLEAN_INTERM_RES = 2105,
// WR && ASH
EN_CLOSE_ASH = 2201,

View File

@ -259,9 +259,12 @@ int ObSqcDetectCB::do_callback()
int ObSingleDfoDetectCB::do_callback()
{
int ret = OB_SUCCESS;
ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false);
ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret;
LIB_LOG(WARN, "[DM] single dfo erase_interm_result_info", K(ret), K(key_), K_(trace_id));
int clean_ret = OB_E(EventTable::EN_ENABLE_CLEAN_INTERM_RES) OB_SUCCESS;
if (OB_SUCC(clean_ret)) {
ret = MTL(sql::dtl::ObDTLIntermResultManager*)->erase_interm_result_info(key_, false);
ret = ret == OB_HASH_NOT_EXIST ? OB_SUCCESS : ret;
LIB_LOG(WARN, "[DM] single dfo erase_interm_result_info", K(ret), K(key_), K_(trace_id));
}
return ret;
}

View File

@ -20,6 +20,7 @@
#include "sql/dtl/ob_dtl_linked_buffer.h"
#include "sql/dtl/ob_dtl_msg_type.h"
#include "share/detect/ob_detect_manager_utils.h"
#include "deps/oblib/src/lib/hash/ob_hashtable.h"
using namespace oceanbase;
using namespace common;
@ -168,7 +169,6 @@ void ObDTLIntermResultManager::destroy()
void ObDTLIntermResultManager::mtl_destroy(ObDTLIntermResultManager *&dtl_interm_result_manager)
{
if (nullptr != dtl_interm_result_manager) {
dtl_interm_result_manager->destroy();
ob_delete(dtl_interm_result_manager);
dtl_interm_result_manager = nullptr;
}
@ -262,7 +262,7 @@ void ObDTLIntermResultManager::free_interm_result_info(ObDTLIntermResultInfo *re
}
}
int ObDTLIntermResultManager::erase_interm_result_info(ObDTLIntermResultKey &key,
int ObDTLIntermResultManager::erase_interm_result_info(const ObDTLIntermResultKey &key,
bool need_unregister_check_item_from_dm)
{
int ret = OB_SUCCESS;
@ -369,20 +369,27 @@ int ObDTLIntermResultManager::generate_monitor_info_rows(observer::ObDTLIntermRe
int ObDTLIntermResultManager::erase_tenant_interm_result_info()
{
int ret = OB_SUCCESS;
for (auto iter = map_.begin(); iter != map_.end(); ++iter) {
ObDTLIntermResultKey &key = iter->first;
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
LOG_WARN("fail to erase result info", K(key), K(tmp_ret));
ret = tmp_ret;
MAP::bucket_iterator bucket_it = map_.bucket_begin();
while (bucket_it != map_.bucket_end()) {
MAP::hashtable::bucket_lock_cond blc(*bucket_it);
MAP::hashtable::writelocker locker(blc.lock());
MAP::hashtable::hashbucket::const_iterator node_it = bucket_it->node_begin();
while (node_it != bucket_it->node_end()) {
int tmp_ret = OB_SUCCESS;
const ObDTLIntermResultKey &key = node_it->first;
node_it++;
if (OB_SUCCESS != (tmp_ret = erase_interm_result_info(key))) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
LOG_WARN("fail to erase result info", K(key), K(tmp_ret));
ret = tmp_ret;
}
}
}
bucket_it++;
}
if (OB_SUCC(ret)) {
LOG_INFO("erase_tenant_interm_result_info", K(MTL_ID()), K(map_.size()));
}
return ret;
}

View File

@ -232,7 +232,7 @@ public:
int get_interm_result_info(ObDTLIntermResultKey &key, ObDTLIntermResultInfo &result_info);
int create_interm_result_info(ObMemAttr &attr, ObDTLIntermResultInfoGuard &result_info_guard,
const ObDTLIntermResultMonitorInfo &monitor_info);
int erase_interm_result_info(ObDTLIntermResultKey &key, bool need_unregister_check_item_from_dm=true);
int erase_interm_result_info(const ObDTLIntermResultKey &key, bool need_unregister_check_item_from_dm=true);
int insert_interm_result_info(ObDTLIntermResultKey &key, ObDTLIntermResultInfo *&result_info);
// 以下两个接口会持有bucket读锁.
int clear_timeout_result_info(ObDTLIntermResultGC &gc);

View File

@ -627,7 +627,10 @@ void ObSerialDfoScheduler::clean_dtl_interm_result(ObExecContext &exec_ctx)
int ret = OB_SUCCESS;
const ObIArray<ObDfo *> &all_dfos = coord_info_.dfo_mgr_.get_all_dfos();
ObDfo *last_dfo = all_dfos.at(all_dfos.count() - 1);
if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent())
int clean_ret = OB_E(EventTable::EN_ENABLE_CLEAN_INTERM_RES) OB_SUCCESS;
if (clean_ret != OB_SUCCESS) {
// Fault injection: Do not clean up interm results.
} else if (OB_NOT_NULL(last_dfo) && last_dfo->is_scheduled() && OB_NOT_NULL(last_dfo->parent())
&& last_dfo->parent()->is_root_dfo()) {
// all dfo scheduled, do nothing.
LOG_TRACE("all dfo scheduled.");