[Fix](executor)Fix scan entity core #21696

After the last time to call scan_task.scan_func(),the should be ended, this means PipelineFragmentContext could be released.
Then after PipelineFragmentContext is released, visiting its field such as query_ctx or _state may cause core dump.
But it can only explain core 2

void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
                                                taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
    while (!_is_closed) {
        taskgroup::ScanTask scan_task;
        auto success = scan_queue->take(&scan_task);
        if (success) {
            int64_t time_spent = 0;
            {
                SCOPED_RAW_TIMER(&time_spent);
                scan_task.scan_func();
            }
            scan_queue->update_statistics(scan_task, time_spent);
        }
    }
}
This commit is contained in:
wangbo
2023-07-11 15:56:13 +08:00
committed by GitHub
parent 4cbd99ad9b
commit d3317aa33b
3 changed files with 13 additions and 6 deletions

View File

@ -24,11 +24,14 @@
namespace doris {
namespace taskgroup {
static void empty_function() {}
ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {}
ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {}
ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context,
int priority)
: scan_func(std::move(scan_func)), scanner_context(scanner_context), priority(priority) {}
TGSTEntityPtr scan_entity, int priority)
: scan_func(std::move(scan_func)),
scanner_context(scanner_context),
scan_entity(scan_entity),
priority(priority) {}
ScanTaskQueue::ScanTaskQueue() : _queue(config::doris_scanner_thread_pool_queue_size) {}
@ -98,7 +101,7 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
}
void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t time_spent) {
auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity();
auto* entity = scan_task.scan_entity;
std::unique_lock<std::mutex> lock(_rs_mutex);
auto find_entity = _group_entities.find(entity);
bool is_in_queue = find_entity != _group_entities.end();

View File

@ -33,7 +33,8 @@ static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
// Like PriorityThreadPool::Task
struct ScanTask {
ScanTask();
ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, int priority);
ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context,
TGSTEntityPtr scan_entity, int priority);
bool operator<(const ScanTask& o) const { return priority < o.priority; }
ScanTask& operator++() {
priority += 2;
@ -42,6 +43,7 @@ struct ScanTask {
WorkFunction scan_func;
vectorized::ScannerContext* scanner_context;
TGSTEntityPtr scan_entity;
int priority;
};

View File

@ -226,7 +226,9 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
auto work_func = [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
};
taskgroup::ScanTask scan_task = {work_func, ctx, nice};
taskgroup::ScanTask scan_task = {
work_func, ctx, ctx->get_task_group()->local_scan_task_entity(),
nice};
ret = _task_group_local_scan_queue->push_back(scan_task);
} else {
PriorityThreadPool::Task task;