Congestion controller processing using delayed tasks.

Replacing Module based mechanism for processing with posting tasks.
This prepares for allowing the interval to be changed at runtime and
for removing the dependency on Module threads.

Bug: webrtc:8415
Change-Id: Iaad50466bec695be4ba26d8bd670a1981f2e0df4
Reviewed-on: https://webrtc-review.googlesource.com/60862
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#22406}
This commit is contained in:
Sebastian Jansson
2018-03-13 11:38:45 +01:00
committed by Commit Bot
parent 8a793a0b1b
commit 5f22be7cf8
3 changed files with 51 additions and 30 deletions

View File

@ -137,11 +137,21 @@ class SendSideCongestionController
void SetPacingFactor(float pacing_factor);
protected:
// Waits long enough that any outstanding tasks should be finished.
void WaitOnTasks();
// TODO(srte): The tests should be rewritten to not depend on internals and
// these functions should be removed.
// Post tasks that are normally delayed. This allows unit tests to trigger
// process updates immediately.
void PostDelayedTasksForTest();
// Waits for outstanding tasks to be finished. This allos unit tests to ensure
// that expected callbacks has be called.
void WaitOnTasksForTest();
private:
void MaybeCreateControllers();
void StartProcess() RTC_RUN_ON(task_queue_);
void ProcessTask();
void StartPacerQueueUpdate();
void PacerQueueUpdateTask();
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();
@ -166,11 +176,7 @@ class SendSideCongestionController
std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_);
// TODO(srte): Review access constraints of these when introducing delayed
// tasks. Only accessed from process threads.
TimeDelta process_interval_;
// Only accessed from process threads.
int64_t last_process_update_ms_ = 0;
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_);

View File

@ -314,6 +314,7 @@ void SendSideCongestionController::MaybeCreateControllers() {
controller_ =
controller_factory_->Create(control_handler_.get(), initial_config_);
UpdateStreamsConfig();
StartProcess();
}
SendSideCongestionController::~SendSideCongestionController() {
@ -498,33 +499,40 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
}
int64_t SendSideCongestionController::TimeUntilNextProcess() {
const int kMaxProcessInterval = 60 * 1000;
if (process_interval_.IsInfinite())
return kMaxProcessInterval;
int64_t next_process_ms = last_process_update_ms_ + process_interval_.ms();
int64_t time_until_next_process =
next_process_ms - clock_->TimeInMilliseconds();
return std::max<int64_t>(time_until_next_process, 0);
// Using task queue to process, just sleep long to avoid wasting resources.
return 60 * 1000;
}
void SendSideCongestionController::Process() {
int64_t now_ms = clock_->TimeInMilliseconds();
last_process_update_ms_ = now_ms;
{
// Ignored, using task queue to process.
}
void SendSideCongestionController::StartProcess() {
task_queue_->PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
ProcessTask();
StartProcess();
},
process_interval_.ms());
}
void SendSideCongestionController::ProcessTask() {
RTC_DCHECK_RUN_ON(task_queue_.get());
if (controller_) {
ProcessInterval msg;
msg.at_time = Timestamp::ms(now_ms);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
if (controller_)
controller_->OnProcessInterval(msg);
});
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
controller_->OnProcessInterval(msg);
}
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_.get());
}
void SendSideCongestionController::PacerQueueUpdateTask() {
RTC_DCHECK_RUN_ON(task_queue_.get());
if (control_handler_) {
PacerQueueUpdate msg;
msg.expected_queue_time = TimeDelta::ms(pacer_->ExpectedQueueTimeMs());
control_handler_->OnPacerQueueUpdate(msg);
});
}
}
void SendSideCongestionController::AddPacket(
@ -584,7 +592,14 @@ SendSideCongestionController::GetTransportFeedbackVector() const {
return transport_feedback_adapter_.GetTransportFeedbackVector();
}
void SendSideCongestionController::WaitOnTasks() {
void SendSideCongestionController::PostDelayedTasksForTest() {
task_queue_->PostTask([this]() {
ProcessTask();
PacerQueueUpdateTask();
});
}
void SendSideCongestionController::WaitOnTasksForTest() {
rtc::Event event(false, false);
task_queue_->PostTask([&event]() { event.Set(); });
event.Wait(rtc::Event::kForever);

View File

@ -48,10 +48,10 @@ class SendSideCongestionControllerForTest
public:
using SendSideCongestionController::SendSideCongestionController;
~SendSideCongestionControllerForTest() {}
using SendSideCongestionController::WaitOnTasks;
void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); }
void Process() override {
SendSideCongestionController::Process();
SendSideCongestionController::WaitOnTasks();
SendSideCongestionController::PostDelayedTasksForTest();
SendSideCongestionController::WaitOnTasksForTest();
}
};
} // namespace