Migrate pacing and video_coding to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: Icfab3e6548055ea72a199a226eca5233b1ead20d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267983
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37467}
This commit is contained in:
Danil Chapovalov
2022-07-06 13:17:54 +02:00
committed by WebRTC LUCI CQ
parent f7b30e046e
commit 0be8eba07e
10 changed files with 38 additions and 72 deletions

View File

@ -38,7 +38,6 @@ rtc_library("pacing") {
"../../api:sequence_checker", "../../api:sequence_checker",
"../../api/rtc_event_log", "../../api/rtc_event_log",
"../../api/task_queue:task_queue", "../../api/task_queue:task_queue",
"../../api/task_queue:to_queued_task",
"../../api/transport:field_trial_based_config", "../../api/transport:field_trial_based_config",
"../../api/transport:network_control", "../../api/transport:network_control",
"../../api/units:data_rate", "../../api/units:data_rate",
@ -99,7 +98,6 @@ if (rtc_include_tests) {
":interval_budget", ":interval_budget",
":pacing", ":pacing",
"../../api/task_queue:task_queue", "../../api/task_queue:task_queue",
"../../api/task_queue:to_queued_task",
"../../api/transport:network_control", "../../api/transport:network_control",
"../../api/units:data_rate", "../../api/units:data_rate",
"../../api/units:time_delta", "../../api/units:time_delta",
@ -116,5 +114,6 @@ if (rtc_include_tests) {
"../rtp_rtcp:mock_rtp_rtcp", "../rtp_rtcp:mock_rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format", "../rtp_rtcp:rtp_rtcp_format",
] ]
absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ]
} }
} }

View File

@ -316,11 +316,10 @@ void TaskQueuePacedSender::MaybeProcessPackets(
} }
} }
task_queue_.PostDelayedTaskWithPrecision( task_queue_.Get()->PostDelayedTaskWithPrecision(
precision, ToQueuedTask([this, next_send_time]() { precision,
MaybeProcessPackets(next_send_time); [this, next_send_time]() { MaybeProcessPackets(next_send_time); },
}), time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>());
next_process_time_ = next_send_time; next_process_time_ = next_send_time;
} }
} }

View File

@ -18,8 +18,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "absl/functional/any_invocable.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
#include "api/transport/network_types.h" #include "api/transport/network_types.h"
#include "api/units/data_rate.h" #include "api/units/data_rate.h"
#include "modules/pacing/packet_router.h" #include "modules/pacing/packet_router.h"
@ -119,36 +119,27 @@ class TaskQueueWithFakePrecisionFactory : public TaskQueueFactory {
// TaskQueueDeleter. // TaskQueueDeleter.
delete this; delete this;
} }
void PostTask(std::unique_ptr<QueuedTask> task) override { void PostTask(absl::AnyInvocable<void() &&> task) override {
task_queue_->PostTask( task_queue_->PostTask(WrapTask(std::move(task)));
ToQueuedTask([this, task = std::move(task)]() mutable {
RunTask(std::move(task));
}));
} }
void PostDelayedTask(std::unique_ptr<QueuedTask> task, void PostDelayedTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override { TimeDelta delay) override {
++parent_factory_->delayed_low_precision_count_; ++parent_factory_->delayed_low_precision_count_;
task_queue_->PostDelayedTask( task_queue_->PostDelayedTask(WrapTask(std::move(task)), delay);
ToQueuedTask([this, task = std::move(task)]() mutable {
RunTask(std::move(task));
}),
milliseconds);
} }
void PostDelayedHighPrecisionTask(std::unique_ptr<QueuedTask> task, void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
uint32_t milliseconds) override { TimeDelta delay) override {
++parent_factory_->delayed_high_precision_count_; ++parent_factory_->delayed_high_precision_count_;
task_queue_->PostDelayedHighPrecisionTask( task_queue_->PostDelayedHighPrecisionTask(WrapTask(std::move(task)),
ToQueuedTask([this, task = std::move(task)]() mutable { delay);
RunTask(std::move(task));
}),
milliseconds);
} }
private: private:
void RunTask(std::unique_ptr<QueuedTask> task) { absl::AnyInvocable<void() &&> WrapTask(absl::AnyInvocable<void() &&> task) {
CurrentTaskQueueSetter set_current(this); return [this, task = std::move(task)]() mutable {
if (!task->Run()) CurrentTaskQueueSetter set_current(this);
task.release(); std::move(task)();
};
} }
TaskQueueWithFakePrecisionFactory* parent_factory_; TaskQueueWithFakePrecisionFactory* parent_factory_;

View File

@ -223,7 +223,6 @@ rtc_library("video_coding") {
"../../api:scoped_refptr", "../../api:scoped_refptr",
"../../api:sequence_checker", "../../api:sequence_checker",
"../../api/task_queue", "../../api/task_queue",
"../../api/task_queue:to_queued_task",
"../../api/units:data_rate", "../../api/units:data_rate",
"../../api/units:data_size", "../../api/units:data_size",
"../../api/units:frequency", "../../api/units:frequency",
@ -423,7 +422,7 @@ rtc_library("video_coding_utility") {
"../../api:field_trials_view", "../../api:field_trials_view",
"../../api:scoped_refptr", "../../api:scoped_refptr",
"../../api:sequence_checker", "../../api:sequence_checker",
"../../api/task_queue:to_queued_task", "../../api/units:time_delta",
"../../api/video:encoded_frame", "../../api/video:encoded_frame",
"../../api/video:encoded_image", "../../api/video:encoded_image",
"../../api/video:video_adaptation", "../../api/video:video_adaptation",
@ -856,7 +855,6 @@ if (rtc_include_tests) {
"../../api:sequence_checker", "../../api:sequence_checker",
"../../api:videocodec_test_fixture_api", "../../api:videocodec_test_fixture_api",
"../../api/task_queue", "../../api/task_queue",
"../../api/task_queue:to_queued_task",
"../../api/video:builtin_video_bitrate_allocator_factory", "../../api/video:builtin_video_bitrate_allocator_factory",
"../../api/video:encoded_image", "../../api/video:encoded_image",
"../../api/video:video_bitrate_allocation", "../../api/video:video_bitrate_allocation",

View File

@ -19,7 +19,6 @@
#include <utility> #include <utility>
#include "api/scoped_refptr.h" #include "api/scoped_refptr.h"
#include "api/task_queue/to_queued_task.h"
#include "api/video/builtin_video_bitrate_allocator_factory.h" #include "api/video/builtin_video_bitrate_allocator_factory.h"
#include "api/video/i420_buffer.h" #include "api/video/i420_buffer.h"
#include "api/video/video_bitrate_allocator_factory.h" #include "api/video/video_bitrate_allocator_factory.h"
@ -340,9 +339,9 @@ int32_t VideoProcessor::VideoProcessorDecodeCompleteCallback::Decoded(
.build(); .build();
copy.set_timestamp(image.timestamp()); copy.set_timestamp(image.timestamp());
task_queue_->PostTask(ToQueuedTask([this, copy]() { task_queue_->PostTask([this, copy]() {
video_processor_->FrameDecoded(copy, simulcast_svc_idx_); video_processor_->FrameDecoded(copy, simulcast_svc_idx_);
})); });
return 0; return 0;
} }
video_processor_->FrameDecoded(image, simulcast_svc_idx_); video_processor_->FrameDecoded(image, simulcast_svc_idx_);

View File

@ -21,7 +21,6 @@
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/test/videocodec_test_fixture.h" #include "api/test/videocodec_test_fixture.h"
#include "api/video/encoded_image.h" #include "api/video/encoded_image.h"
@ -105,8 +104,11 @@ class VideoProcessor {
// Post the callback to the right task queue, if needed. // Post the callback to the right task queue, if needed.
if (!task_queue_->IsCurrent()) { if (!task_queue_->IsCurrent()) {
task_queue_->PostTask(std::make_unique<EncodeCallbackTask>( VideoProcessor* video_processor = video_processor_;
video_processor_, encoded_image, codec_specific_info)); task_queue_->PostTask([video_processor, encoded_image,
codec_specific_info = *codec_specific_info] {
video_processor->FrameEncoded(encoded_image, codec_specific_info);
});
return Result(Result::OK, 0); return Result(Result::OK, 0);
} }
@ -115,27 +117,6 @@ class VideoProcessor {
} }
private: private:
class EncodeCallbackTask : public QueuedTask {
public:
EncodeCallbackTask(VideoProcessor* video_processor,
const webrtc::EncodedImage& encoded_image,
const webrtc::CodecSpecificInfo* codec_specific_info)
: video_processor_(video_processor),
encoded_image_(encoded_image),
codec_specific_info_(*codec_specific_info) {
}
bool Run() override {
video_processor_->FrameEncoded(encoded_image_, codec_specific_info_);
return true;
}
private:
VideoProcessor* const video_processor_;
webrtc::EncodedImage encoded_image_;
const webrtc::CodecSpecificInfo codec_specific_info_;
};
VideoProcessor* const video_processor_; VideoProcessor* const video_processor_;
TaskQueueBase* const task_queue_; TaskQueueBase* const task_queue_;
}; };

View File

@ -221,7 +221,7 @@ int NackRequester::OnReceivedPacket(uint16_t seq_num,
void NackRequester::ClearUpTo(uint16_t seq_num) { void NackRequester::ClearUpTo(uint16_t seq_num) {
// Called via RtpVideoStreamReceiver2::FrameContinuous on the network thread. // Called via RtpVideoStreamReceiver2::FrameContinuous on the network thread.
worker_thread_->PostTask(ToQueuedTask(task_safety_, [seq_num, this]() { worker_thread_->PostTask(SafeTask(task_safety_.flag(), [seq_num, this]() {
RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK_RUN_ON(worker_thread_);
nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num)); nack_list_.erase(nack_list_.begin(), nack_list_.lower_bound(seq_num));
keyframe_list_.erase(keyframe_list_.begin(), keyframe_list_.erase(keyframe_list_.begin(),

View File

@ -59,12 +59,12 @@ class TestNackRequester : public ::testing::Test,
RTC_DCHECK(!waiting_for_send_nack_); RTC_DCHECK(!waiting_for_send_nack_);
waiting_for_send_nack_ = true; waiting_for_send_nack_ = true;
loop_.PostDelayedTask( loop_.task_queue()->PostDelayedTask(
[this]() { [this]() {
timed_out_ = true; timed_out_ = true;
loop_.Quit(); loop_.Quit();
}, },
1000); TimeDelta::Seconds(1));
loop_.Run(); loop_.Run();

View File

@ -15,7 +15,6 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "api/task_queue/to_queued_task.h"
#include "api/video/video_adaptation_reason.h" #include "api/video/video_adaptation_reason.h"
#include "api/video_codecs/video_encoder.h" #include "api/video_codecs/video_encoder.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
@ -58,7 +57,7 @@ BandwidthQualityScaler::~BandwidthQualityScaler() {
void BandwidthQualityScaler::StartCheckForBitrate() { void BandwidthQualityScaler::StartCheckForBitrate() {
RTC_DCHECK_RUN_ON(&task_checker_); RTC_DCHECK_RUN_ON(&task_checker_);
TaskQueueBase::Current()->PostDelayedTask( TaskQueueBase::Current()->PostDelayedTask(
ToQueuedTask([this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), this] { [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), this] {
if (!this_weak_ptr) { if (!this_weak_ptr) {
// The caller BandwidthQualityScaler has been deleted. // The caller BandwidthQualityScaler has been deleted.
return; return;
@ -84,8 +83,8 @@ void BandwidthQualityScaler::StartCheckForBitrate() {
} }
} }
StartCheckForBitrate(); StartCheckForBitrate();
}), },
kBitrateStateUpdateInterval.ms()); kBitrateStateUpdateInterval);
} }
void BandwidthQualityScaler::ReportEncodeInfo(int frame_size_bytes, void BandwidthQualityScaler::ReportEncodeInfo(int frame_size_bytes,

View File

@ -13,7 +13,7 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include "api/task_queue/to_queued_task.h" #include "api/units/time_delta.h"
#include "api/video/video_adaptation_reason.h" #include "api/video/video_adaptation_reason.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/experiments/quality_scaler_settings.h" #include "rtc_base/experiments/quality_scaler_settings.h"
@ -97,7 +97,7 @@ class QualityScaler::CheckQpTask {
RTC_DCHECK_EQ(state_, State::kNotStarted); RTC_DCHECK_EQ(state_, State::kNotStarted);
state_ = State::kCheckingQp; state_ = State::kCheckingQp;
TaskQueueBase::Current()->PostDelayedTask( TaskQueueBase::Current()->PostDelayedTask(
ToQueuedTask([this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), this] { [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), this] {
if (!this_weak_ptr) { if (!this_weak_ptr) {
// The task has been cancelled through destruction. // The task has been cancelled through destruction.
return; return;
@ -134,8 +134,8 @@ class QualityScaler::CheckQpTask {
// Starting the next task deletes the pending task. After this line, // Starting the next task deletes the pending task. After this line,
// `this` has been deleted. // `this` has been deleted.
quality_scaler_->StartNextCheckQpTask(); quality_scaler_->StartNextCheckQpTask();
}), },
GetCheckingQpDelayMs()); TimeDelta::Millis(GetCheckingQpDelayMs()));
} }
bool HasCompletedTask() const { return state_ == State::kCompleted; } bool HasCompletedTask() const { return state_ == State::kCompleted; }