allow only compaction task/dag set compaction mem ctx
This commit is contained in:
@ -1698,6 +1698,16 @@ bool ObTenantDagWorker::get_force_cancel_flag()
|
|||||||
return flag;
|
return flag;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObTenantDagWorker::hold_by_compaction_dag()
|
||||||
|
{
|
||||||
|
bool bret = false;
|
||||||
|
ObIDag *dag = nullptr;
|
||||||
|
if (OB_NOT_NULL(task_) && OB_NOT_NULL(dag = task_->get_dag())) {
|
||||||
|
bret = is_compaction_dag(dag->get_type());
|
||||||
|
}
|
||||||
|
return bret;
|
||||||
|
}
|
||||||
|
|
||||||
void ObTenantDagWorker::run1()
|
void ObTenantDagWorker::run1()
|
||||||
{
|
{
|
||||||
self_ = this;
|
self_ = this;
|
||||||
|
|||||||
@ -720,6 +720,7 @@ public:
|
|||||||
static void set_mem_ctx(compaction::ObCompactionMemoryContext *mem_ctx) { if (nullptr == mem_ctx_) { mem_ctx_ = mem_ctx; } }
|
static void set_mem_ctx(compaction::ObCompactionMemoryContext *mem_ctx) { if (nullptr == mem_ctx_) { mem_ctx_ = mem_ctx; } }
|
||||||
uint64_t get_group_id() { return group_id_; }
|
uint64_t get_group_id() { return group_id_; }
|
||||||
bool get_force_cancel_flag();
|
bool get_force_cancel_flag();
|
||||||
|
bool hold_by_compaction_dag();
|
||||||
private:
|
private:
|
||||||
void notify(DagWorkerStatus status);
|
void notify(DagWorkerStatus status);
|
||||||
void reset_compaction_thread_locals() { is_reserve_mode_ = false; mem_ctx_ = nullptr; }
|
void reset_compaction_thread_locals() { is_reserve_mode_ = false; mem_ctx_ = nullptr; }
|
||||||
@ -1419,7 +1420,14 @@ inline bool is_reserve_mode()
|
|||||||
#define SET_MEM_CTX(mem_ctx) \
|
#define SET_MEM_CTX(mem_ctx) \
|
||||||
({ \
|
({ \
|
||||||
share::ObTenantDagWorker *worker = share::ObTenantDagWorker::self(); \
|
share::ObTenantDagWorker *worker = share::ObTenantDagWorker::self(); \
|
||||||
worker->set_mem_ctx(&mem_ctx); \
|
if (NULL != worker) { \
|
||||||
|
if (worker->hold_by_compaction_dag()) { \
|
||||||
|
worker->set_mem_ctx(&mem_ctx); \
|
||||||
|
} else { \
|
||||||
|
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, \
|
||||||
|
"only compaction dag can set memctx", KPC(worker)); \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
})
|
})
|
||||||
|
|
||||||
#define CURRENT_MEM_CTX() \
|
#define CURRENT_MEM_CTX() \
|
||||||
@ -1427,7 +1435,12 @@ inline bool is_reserve_mode()
|
|||||||
compaction::ObCompactionMemoryContext *mem_ctx = nullptr; \
|
compaction::ObCompactionMemoryContext *mem_ctx = nullptr; \
|
||||||
share::ObTenantDagWorker *worker = share::ObTenantDagWorker::self(); \
|
share::ObTenantDagWorker *worker = share::ObTenantDagWorker::self(); \
|
||||||
if (NULL != worker) { \
|
if (NULL != worker) { \
|
||||||
mem_ctx = worker->get_mem_ctx(); \
|
if (worker->hold_by_compaction_dag()) { \
|
||||||
|
mem_ctx = worker->get_mem_ctx(); \
|
||||||
|
} else { \
|
||||||
|
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, \
|
||||||
|
"memctx only provided for compaction dag", KPC(worker)); \
|
||||||
|
} \
|
||||||
} \
|
} \
|
||||||
mem_ctx; \
|
mem_ctx; \
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user