diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h index 91f7117f7e..1bcc543c2e 100644 --- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h +++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h @@ -137,11 +137,21 @@ class SendSideCongestionController void SetPacingFactor(float pacing_factor); protected: - // Waits long enough that any outstanding tasks should be finished. - void WaitOnTasks(); + // TODO(srte): The tests should be rewritten to not depend on internals and + // these functions should be removed. + // Post tasks that are normally delayed. This allows unit tests to trigger + // process updates immediately. + void PostDelayedTasksForTest(); + // Waits for outstanding tasks to be finished. This allos unit tests to ensure + // that expected callbacks has be called. + void WaitOnTasksForTest(); private: void MaybeCreateControllers(); + void StartProcess() RTC_RUN_ON(task_queue_); + void ProcessTask(); + void StartPacerQueueUpdate(); + void PacerQueueUpdateTask(); void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void MaybeUpdateOutstandingData(); @@ -166,11 +176,7 @@ class SendSideCongestionController std::unique_ptr controller_ RTC_GUARDED_BY(task_queue_); - // TODO(srte): Review access constraints of these when introducing delayed - // tasks. Only accessed from process threads. - TimeDelta process_interval_; - // Only accessed from process threads. - int64_t last_process_update_ms_ = 0; + TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_); std::map last_report_blocks_ RTC_GUARDED_BY(task_queue_); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index 2901321048..6b69c9c2f8 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -314,6 +314,7 @@ void SendSideCongestionController::MaybeCreateControllers() { controller_ = controller_factory_->Create(control_handler_.get(), initial_config_); UpdateStreamsConfig(); + StartProcess(); } SendSideCongestionController::~SendSideCongestionController() { @@ -498,33 +499,40 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, } int64_t SendSideCongestionController::TimeUntilNextProcess() { - const int kMaxProcessInterval = 60 * 1000; - if (process_interval_.IsInfinite()) - return kMaxProcessInterval; - int64_t next_process_ms = last_process_update_ms_ + process_interval_.ms(); - int64_t time_until_next_process = - next_process_ms - clock_->TimeInMilliseconds(); - return std::max(time_until_next_process, 0); + // Using task queue to process, just sleep long to avoid wasting resources. + return 60 * 1000; } void SendSideCongestionController::Process() { - int64_t now_ms = clock_->TimeInMilliseconds(); - last_process_update_ms_ = now_ms; - { + // Ignored, using task queue to process. +} + +void SendSideCongestionController::StartProcess() { + task_queue_->PostDelayedTask( + [this]() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + ProcessTask(); + StartProcess(); + }, + process_interval_.ms()); +} + +void SendSideCongestionController::ProcessTask() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (controller_) { ProcessInterval msg; - msg.at_time = Timestamp::ms(now_ms); - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); - if (controller_) - controller_->OnProcessInterval(msg); - }); + msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); + controller_->OnProcessInterval(msg); } - task_queue_->PostTask([this]() { - RTC_DCHECK_RUN_ON(task_queue_.get()); +} + +void SendSideCongestionController::PacerQueueUpdateTask() { + RTC_DCHECK_RUN_ON(task_queue_.get()); + if (control_handler_) { PacerQueueUpdate msg; msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs()); control_handler_->OnPacerQueueUpdate(msg); - }); + } } void SendSideCongestionController::AddPacket( @@ -584,7 +592,14 @@ SendSideCongestionController::GetTransportFeedbackVector() const { return transport_feedback_adapter_.GetTransportFeedbackVector(); } -void SendSideCongestionController::WaitOnTasks() { +void SendSideCongestionController::PostDelayedTasksForTest() { + task_queue_->PostTask([this]() { + ProcessTask(); + PacerQueueUpdateTask(); + }); +} + +void SendSideCongestionController::WaitOnTasksForTest() { rtc::Event event(false, false); task_queue_->PostTask([&event]() { event.Set(); }); event.Wait(rtc::Event::kForever); diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc index a982c74216..e27c559b8c 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc @@ -48,10 +48,10 @@ class SendSideCongestionControllerForTest public: using SendSideCongestionController::SendSideCongestionController; ~SendSideCongestionControllerForTest() {} - using SendSideCongestionController::WaitOnTasks; + void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); } void Process() override { - SendSideCongestionController::Process(); - SendSideCongestionController::WaitOnTasks(); + SendSideCongestionController::PostDelayedTasksForTest(); + SendSideCongestionController::WaitOnTasksForTest(); } }; } // namespace