Files
doris/be/src/vec/exec/scan/scan_task_queue.h
wangbo d3317aa33b [Fix](executor)Fix scan entity core #21696
After the last time to call scan_task.scan_func(),the should be ended, this means PipelineFragmentContext could be released.
Then after PipelineFragmentContext is released, visiting its field such as query_ctx or _state may cause core dump.
But it can only explain core 2

void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
                                                taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
    while (!_is_closed) {
        taskgroup::ScanTask scan_task;
        auto success = scan_queue->take(&scan_task);
        if (success) {
            int64_t time_spent = 0;
            {
                SCOPED_RAW_TIMER(&time_spent);
                scan_task.scan_func();
            }
            scan_queue->update_statistics(scan_task, time_spent);
        }
    }
}
2023-07-11 15:56:13 +08:00

101 lines
3.2 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "olap/tablet.h"
#include "runtime/task_group/task_group.h"
#include "util/blocking_priority_queue.hpp"
namespace doris {
namespace vectorized {
class ScannerContext;
};
namespace taskgroup {
using WorkFunction = std::function<void()>;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
// Like PriorityThreadPool::Task
struct ScanTask {
ScanTask();
ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context,
TGSTEntityPtr scan_entity, int priority);
bool operator<(const ScanTask& o) const { return priority < o.priority; }
ScanTask& operator++() {
priority += 2;
return *this;
}
WorkFunction scan_func;
vectorized::ScannerContext* scanner_context;
TGSTEntityPtr scan_entity;
int priority;
};
// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
class ScanTaskQueue {
public:
ScanTaskQueue();
Status try_push_back(ScanTask);
bool try_get(ScanTask* scan_task, uint32_t timeout_ms);
int size() { return _queue.get_size(); }
private:
BlockingPriorityQueue<ScanTask> _queue;
};
// Like TaskGroupTaskQueue
class ScanTaskTaskGroupQueue {
public:
explicit ScanTaskTaskGroupQueue(size_t core_size);
~ScanTaskTaskGroupQueue();
void close();
bool take(ScanTask* scan_task);
bool push_back(ScanTask);
void update_statistics(ScanTask task, int64_t time_spent);
void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr);
private:
TGSTEntityPtr _task_entity(ScanTask& scan_task);
void _enqueue_task_group(TGSTEntityPtr);
void _dequeue_task_group(TGSTEntityPtr);
TGSTEntityPtr _next_tg_entity();
uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const;
void _update_min_tg();
// Like cfs rb tree in sched_entity
struct TaskGroupSchedEntityComparator {
bool operator()(const taskgroup::TGSTEntityPtr&, const taskgroup::TGSTEntityPtr&) const;
};
using ResouceGroupSet = std::set<taskgroup::TGSTEntityPtr, TaskGroupSchedEntityComparator>;
ResouceGroupSet _group_entities;
std::condition_variable _wait_task;
std::mutex _rs_mutex;
bool _closed = false;
int _total_cpu_share = 0;
std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
size_t _core_size;
};
} // namespace taskgroup
} // namespace doris