// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // This file is copied from // https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/blocking-priority-queue.hpp // and modified by Doris #pragma once #include #include #include #include #include "common/config.h" #include "util/stopwatch.hpp" namespace doris { // Fixed capacity FIFO queue, where both blocking_get and blocking_put operations block // if the queue is empty or full, respectively. template class BlockingPriorityQueue { public: BlockingPriorityQueue(size_t max_elements) : _shutdown(false), _max_element(max_elements), _upgrade_counter(0), _total_get_wait_time(0), _total_put_wait_time(0), _get_waiting(0), _put_waiting(0) {} // Get an element from the queue, waiting indefinitely (or until timeout) for one to become available. // Returns false if we were shut down prior to getting the element, and there // are no more elements available. // -- timeout_ms: 0 means wait indefinitely bool blocking_get(T* out, uint32_t timeout_ms = 0) { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); bool wait_successful = false; if (timeout_ms > 0) { while (!(_shutdown || !_queue.empty())) { ++_get_waiting; if (_get_cv.wait_for(unique_lock, std::chrono::milliseconds(timeout_ms)) == std::cv_status::timeout) { // timeout wait_successful = _shutdown || !_queue.empty(); break; } } } else { while (!(_shutdown || !_queue.empty())) { ++_get_waiting; _get_cv.wait(unique_lock); } wait_successful = true; } _total_get_wait_time += timer.elapsed_time(); if (wait_successful) { if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { std::priority_queue tmp_queue; while (!_queue.empty()) { T v = _queue.top(); _queue.pop(); ++v; tmp_queue.push(v); } swap(_queue, tmp_queue); _upgrade_counter = 0; } if (!_queue.empty()) { *out = _queue.top(); _queue.pop(); ++_upgrade_counter; if (_put_waiting > 0) { --_put_waiting; unique_lock.unlock(); _put_cv.notify_one(); } return true; } else { assert(_shutdown); return false; } } else { //time out assert(!_shutdown); return false; } } bool non_blocking_get(T* out) { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); if (!_queue.empty()) { // 定期提高队列中残留的任务优先级 // 保证优先级较低的大查询不至于完全饿死 if (_upgrade_counter > config::priority_queue_remaining_tasks_increased_frequency) { std::priority_queue tmp_queue; while (!_queue.empty()) { T v = _queue.top(); _queue.pop(); ++v; tmp_queue.push(v); } swap(_queue, tmp_queue); _upgrade_counter = 0; } *out = _queue.top(); _queue.pop(); ++_upgrade_counter; _total_get_wait_time += timer.elapsed_time(); if (_put_waiting > 0) { --_put_waiting; unique_lock.unlock(); _put_cv.notify_one(); } return true; } return false; } // Puts an element into the queue, waiting indefinitely until there is space. // If the queue is shut down, returns false. bool blocking_put(const T& val) { MonotonicStopWatch timer; timer.start(); std::unique_lock unique_lock(_lock); while (!(_shutdown || _queue.size() < _max_element)) { ++_put_waiting; _put_cv.wait(unique_lock); } _total_put_wait_time += timer.elapsed_time(); if (_shutdown) { return false; } _queue.push(val); if (_get_waiting > 0) { --_get_waiting; unique_lock.unlock(); _get_cv.notify_one(); } return true; } // Return false if queue full or has been shutdown. bool try_put(const T& val) { std::unique_lock unique_lock(_lock); if (_queue.size() < _max_element && !_shutdown) { _queue.push(val); if (_get_waiting > 0) { --_get_waiting; unique_lock.unlock(); _get_cv.notify_one(); } return true; } return false; } // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown() { { std::lock_guard l(_lock); _shutdown = true; } _get_cv.notify_all(); _put_cv.notify_all(); } uint32_t get_size() const { std::lock_guard l(_lock); return _queue.size(); } uint32_t get_capacity() const { return _max_element; } // Returns the total amount of time threads have blocked in blocking_get. uint64_t total_get_wait_time() const { return _total_get_wait_time; } // Returns the total amount of time threads have blocked in blocking_put. uint64_t total_put_wait_time() const { return _total_put_wait_time; } private: bool _shutdown; const int _max_element; std::condition_variable _get_cv; // 'get' callers wait on this std::condition_variable _put_cv; // 'put' callers wait on this // _lock guards access to _queue, total_get_wait_time, and total_put_wait_time mutable std::mutex _lock; std::priority_queue _queue; int _upgrade_counter; std::atomic _total_get_wait_time; std::atomic _total_put_wait_time; size_t _get_waiting; size_t _put_waiting; }; } // namespace doris