Ensure FIFO order for delayed tasks in ProcessThreadImpl

TaskQueue posts delayed task in milliseconds precision. If delayed tasks
have the same wakeup time in queue, we should ensure they are waked up
in FIFO order. E.g., call `PostDelayedTask(task-i, 0)` in a loop, we
expect `task-i` is waked up as enqueue order.

Co-Author: jiahe.zhang@intel.com
Bug: webrtc:13761
Change-Id: I3bc87c2d251f8dffee868a012e828fd42e783afc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/251960
Reviewed-by: Chen Xing <chxg@google.com>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36582}
This commit is contained in:
Jianhui Dai
2022-03-09 11:06:34 +08:00
committed by WebRTC LUCI CQ
parent 0c68a7aaa7
commit af0a6b34e3
2 changed files with 15 additions and 4 deletions

View File

@ -171,7 +171,7 @@ void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
MutexLock lock(&mutex_);
recalculate_wakeup_time =
delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
delayed_tasks_.emplace(run_at_ms, std::move(task));
delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task));
}
if (recalculate_wakeup_time) {
wake_up_.Set();

View File

@ -65,14 +65,22 @@ class ProcessThreadImpl : public ProcessThread {
ModuleCallback& operator=(ModuleCallback&);
};
struct DelayedTask {
DelayedTask(int64_t run_at_ms, std::unique_ptr<QueuedTask> task)
: run_at_ms(run_at_ms), task(task.release()) {}
DelayedTask(int64_t run_at_ms,
uint64_t sequence_id,
std::unique_ptr<QueuedTask> task)
: run_at_ms(run_at_ms),
sequence_id_(sequence_id),
task(task.release()) {}
friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
// Earliest DelayedTask should be at the top of the priority queue.
return lhs.run_at_ms > rhs.run_at_ms;
if (lhs.run_at_ms != rhs.run_at_ms) {
return lhs.run_at_ms > rhs.run_at_ms;
}
return lhs.sequence_id_ > rhs.sequence_id_;
}
int64_t run_at_ms;
uint64_t sequence_id_;
// DelayedTask owns the `task`, but some delayed tasks must be removed from
// the std::priority_queue, but mustn't be deleted. std::priority_queue does
// not give non-const access to the values, so storing unique_ptr would
@ -101,7 +109,10 @@ class ProcessThreadImpl : public ProcessThread {
// Set to true when calling Process, to allow reentrant calls to WakeUp.
bool holds_mutex_ RTC_GUARDED_BY(this) = false;
std::queue<QueuedTask*> queue_;
// `std::priority_queue` does not guarantee stable sort. For delayed tasks
// with the same wakeup time, use `sequence_id_` to ensure FIFO ordering.
std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(mutex_);
uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0;
// The `stop_` flag is modified only by the construction thread, protected by
// `thread_checker_`. It is read also by the spawned `thread_`. The latter
// thread must take `mutex_` before access, and for thread safety, the