Update TaskQueueStdlib implementation to absl::AnyInvocable

Bug: webrtc:14245
Change-Id: Ic0c55cbb4dbdd31359bbe15f1acd7a2b7e9e61f7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268901
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37568}
This commit is contained in:
Danil Chapovalov
2022-07-19 18:36:31 +02:00
committed by WebRTC LUCI CQ
parent 1a84b565ac
commit ba5700171f
2 changed files with 52 additions and 43 deletions

View File

@ -712,6 +712,7 @@ rtc_library("rtc_task_queue_stdlib") {
] ]
deps = [ deps = [
":checks", ":checks",
":divide_round",
":logging", ":logging",
":macromagic", ":macromagic",
":platform_thread", ":platform_thread",
@ -719,9 +720,13 @@ rtc_library("rtc_task_queue_stdlib") {
":safe_conversions", ":safe_conversions",
":timeutils", ":timeutils",
"../api/task_queue", "../api/task_queue",
"../api/units:time_delta",
"synchronization:mutex", "synchronization:mutex",
] ]
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ] absl_deps = [
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
]
} }
rtc_library("weak_ptr") { rtc_library("weak_ptr") {

View File

@ -18,12 +18,14 @@
#include <queue> #include <queue>
#include <utility> #include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/numerics/divide_round.h"
#include "rtc_base/platform_thread.h" #include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -50,27 +52,29 @@ class TaskQueueStdlib final : public TaskQueueBase {
~TaskQueueStdlib() override = default; ~TaskQueueStdlib() override = default;
void Delete() override; void Delete() override;
void PostTask(std::unique_ptr<QueuedTask> task) override; void PostTask(absl::AnyInvocable<void() &&> task) override;
void PostDelayedTask(std::unique_ptr<QueuedTask> task, void PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override; TimeDelta delay) override;
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override;
private: private:
using OrderId = uint64_t; using OrderId = uint64_t;
struct DelayedEntryTimeout { struct DelayedEntryTimeout {
int64_t next_fire_at_ms_{}; int64_t next_fire_at_us{};
OrderId order_{}; OrderId order{};
bool operator<(const DelayedEntryTimeout& o) const { bool operator<(const DelayedEntryTimeout& o) const {
return std::tie(next_fire_at_ms_, order_) < return std::tie(next_fire_at_us, order) <
std::tie(o.next_fire_at_ms_, o.order_); std::tie(o.next_fire_at_us, o.order);
} }
}; };
struct NextTask { struct NextTask {
bool final_task_ = false; bool final_task = false;
std::unique_ptr<QueuedTask> run_task_; absl::AnyInvocable<void() &&> run_task;
int64_t sleep_time_ms_ = 0; int64_t sleep_time_ms = rtc::Event::kForever;
}; };
static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me, static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me,
@ -97,7 +101,7 @@ class TaskQueueStdlib final : public TaskQueueBase {
// The list of all pending tasks that need to be processed in the // The list of all pending tasks that need to be processed in the
// FIFO queue ordering on the worker thread. // FIFO queue ordering on the worker thread.
std::queue<std::pair<OrderId, std::unique_ptr<QueuedTask>>> pending_queue_ std::queue<std::pair<OrderId, absl::AnyInvocable<void() &&>>> pending_queue_
RTC_GUARDED_BY(pending_lock_); RTC_GUARDED_BY(pending_lock_);
// The list of all pending tasks that need to be processed at a future // The list of all pending tasks that need to be processed at a future
@ -105,8 +109,8 @@ class TaskQueueStdlib final : public TaskQueueBase {
// happen at exactly the same time interval as another task then the // happen at exactly the same time interval as another task then the
// task is processed based on FIFO ordering. std::priority_queue was // task is processed based on FIFO ordering. std::priority_queue was
// considered but rejected due to its inability to extract the // considered but rejected due to its inability to extract the
// std::unique_ptr out of the queue without the presence of a hack. // move-only value out of the queue without the presence of a hack.
std::map<DelayedEntryTimeout, std::unique_ptr<QueuedTask>> delayed_queue_ std::map<DelayedEntryTimeout, absl::AnyInvocable<void() &&>> delayed_queue_
RTC_GUARDED_BY(pending_lock_); RTC_GUARDED_BY(pending_lock_);
// Contains the active worker thread assigned to processing // Contains the active worker thread assigned to processing
@ -151,43 +155,45 @@ void TaskQueueStdlib::Delete() {
delete this; delete this;
} }
void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) { void TaskQueueStdlib::PostTask(absl::AnyInvocable<void() &&> task) {
{ {
MutexLock lock(&pending_lock_); MutexLock lock(&pending_lock_);
OrderId order = thread_posting_order_++; pending_queue_.push(
std::make_pair(++thread_posting_order_, std::move(task)));
pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
order, std::move(task)));
} }
NotifyWake(); NotifyWake();
} }
void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task, void TaskQueueStdlib::PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) { TimeDelta delay) {
const auto fire_at = rtc::TimeMillis() + milliseconds; DelayedEntryTimeout delayed_entry;
delayed_entry.next_fire_at_us = rtc::TimeMicros() + delay.us();
DelayedEntryTimeout delay;
delay.next_fire_at_ms_ = fire_at;
{ {
MutexLock lock(&pending_lock_); MutexLock lock(&pending_lock_);
delay.order_ = ++thread_posting_order_; delayed_entry.order = ++thread_posting_order_;
delayed_queue_[delay] = std::move(task); delayed_queue_[delayed_entry] = std::move(task);
} }
NotifyWake(); NotifyWake();
} }
void TaskQueueStdlib::PostDelayedHighPrecisionTask(
absl::AnyInvocable<void() &&> task,
TimeDelta delay) {
PostDelayedTask(std::move(task), delay);
}
TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() { TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
NextTask result; NextTask result;
const auto tick = rtc::TimeMillis(); const int64_t tick_us = rtc::TimeMicros();
MutexLock lock(&pending_lock_); MutexLock lock(&pending_lock_);
if (thread_should_quit_) { if (thread_should_quit_) {
result.final_task_ = true; result.final_task = true;
return result; return result;
} }
@ -195,29 +201,30 @@ TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
auto delayed_entry = delayed_queue_.begin(); auto delayed_entry = delayed_queue_.begin();
const auto& delay_info = delayed_entry->first; const auto& delay_info = delayed_entry->first;
auto& delay_run = delayed_entry->second; auto& delay_run = delayed_entry->second;
if (tick >= delay_info.next_fire_at_ms_) { if (tick_us >= delay_info.next_fire_at_us) {
if (pending_queue_.size() > 0) { if (pending_queue_.size() > 0) {
auto& entry = pending_queue_.front(); auto& entry = pending_queue_.front();
auto& entry_order = entry.first; auto& entry_order = entry.first;
auto& entry_run = entry.second; auto& entry_run = entry.second;
if (entry_order < delay_info.order_) { if (entry_order < delay_info.order) {
result.run_task_ = std::move(entry_run); result.run_task = std::move(entry_run);
pending_queue_.pop(); pending_queue_.pop();
return result; return result;
} }
} }
result.run_task_ = std::move(delay_run); result.run_task = std::move(delay_run);
delayed_queue_.erase(delayed_entry); delayed_queue_.erase(delayed_entry);
return result; return result;
} }
result.sleep_time_ms_ = delay_info.next_fire_at_ms_ - tick; result.sleep_time_ms =
DivideRoundUp(delay_info.next_fire_at_us - tick_us, 1'000);
} }
if (pending_queue_.size() > 0) { if (pending_queue_.size() > 0) {
auto& entry = pending_queue_.front(); auto& entry = pending_queue_.front();
result.run_task_ = std::move(entry.second); result.run_task = std::move(entry.second);
pending_queue_.pop(); pending_queue_.pop();
} }
@ -228,21 +235,18 @@ void TaskQueueStdlib::ProcessTasks() {
while (true) { while (true) {
auto task = GetNextTask(); auto task = GetNextTask();
if (task.final_task_) if (task.final_task)
break; break;
if (task.run_task_) { if (task.run_task) {
// process entry immediately then try again // process entry immediately then try again
QueuedTask* release_ptr = task.run_task_.release(); std::move(task.run_task)();
if (release_ptr->Run())
delete release_ptr;
// Attempt to run more tasks before going to sleep. // Attempt to run more tasks before going to sleep.
continue; continue;
} }
flag_notify_.Wait(0 == task.sleep_time_ms_ ? rtc::Event::kForever flag_notify_.Wait(task.sleep_time_ms);
: task.sleep_time_ms_);
} }
} }