Cleanup of RtpTransportControllerSend.

This CL simplifies a lot of code that can be cleaned up after the merge
of RtpTransportControllerSend and SendSideCongestionController.

In particular, the role of CongestionControlHandler is reduced to only
handle the pacer pushback and stream pausing mechanism.

Bug: webrtc:9586
Change-Id: Idbc1e968efd35e6df6129bc307f6bc1db18d20f2
Reviewed-on: https://webrtc-review.googlesource.com/c/113947
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25994}
This commit is contained in:
Sebastian Jansson
2018-12-12 16:49:10 +01:00
committed by Commit Bot
parent 806e06d136
commit 1618095100
5 changed files with 116 additions and 220 deletions

View File

@ -132,7 +132,6 @@ RtpTransportControllerSend::RtpTransportControllerSend(
webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")), webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
transport_overhead_bytes_per_packet_(0), transport_overhead_bytes_per_packet_(0),
network_available_(false), network_available_(false),
periodic_tasks_enabled_(true),
packet_feedback_available_(false), packet_feedback_available_(false),
pacer_queue_update_task_(nullptr), pacer_queue_update_task_(nullptr),
controller_task_(nullptr), controller_task_(nullptr),
@ -187,25 +186,15 @@ void RtpTransportControllerSend::DestroyRtpVideoSender(
video_rtp_senders_.erase(it); video_rtp_senders_.erase(it);
} }
void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps, void RtpTransportControllerSend::UpdateControlState() {
uint8_t fraction_loss, absl::optional<TargetTransferRate> update = control_handler_->GetUpdate();
int64_t rtt_ms, if (!update)
int64_t probing_interval_ms) { return;
TargetTransferRate msg; retransmission_rate_limiter_.SetMaxRate(
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); update->network_estimate.bandwidth.bps());
msg.target_rate = DataRate::bps(bitrate_bps); // We won't create control_handler_ until we have an observers.
// TODO(srte): Remove this interface and push information about bandwidth
// estimation to users of this class, thereby reducing synchronous calls.
RTC_DCHECK_RUN_ON(&task_queue_);
RTC_DCHECK(control_handler_->last_transfer_rate().has_value());
msg.network_estimate =
control_handler_->last_transfer_rate()->network_estimate;
retransmission_rate_limiter_.SetMaxRate(msg.network_estimate.bandwidth.bps());
// We won't register as observer until we have an observers.
RTC_DCHECK(observer_ != nullptr); RTC_DCHECK(observer_ != nullptr);
observer_->OnTargetTransferRate(msg); observer_->OnTargetTransferRate(*update);
} }
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() { rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@ -323,7 +312,7 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
task_queue_.PostTask([this, msg] { task_queue_.PostTask([this, msg] {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) { if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg)); PostUpdates(controller_->OnNetworkRouteChange(msg));
} else { } else {
UpdateInitialConstraints(msg.constraints); UpdateInitialConstraints(msg.constraints);
} }
@ -339,10 +328,20 @@ void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
msg.network_available = network_available; msg.network_available = network_available;
task_queue_.PostTask([this, msg]() { task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (network_available_ == msg.network_available)
return;
network_available_ = msg.network_available; network_available_ = msg.network_available;
if (network_available_) {
pacer_.Resume();
} else {
pacer_.Pause();
}
pacer_.UpdateOutstandingData(0);
if (controller_) { if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg)); control_handler_->SetNetworkAvailability(network_available_);
control_handler_->OnNetworkAvailability(msg); PostUpdates(controller_->OnNetworkAvailability(msg));
UpdateControlState();
} else { } else {
MaybeCreateControllers(); MaybeCreateControllers();
} }
@ -382,10 +381,11 @@ void RtpTransportControllerSend::OnSentPacket(
task_queue_.PostTask([this, packet_msg]() { task_queue_.PostTask([this, packet_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg)); PostUpdates(controller_->OnSentPacket(*packet_msg));
}); });
} }
MaybeUpdateOutstandingData(); pacer_.UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData().bytes());
} }
void RtpTransportControllerSend::SetSdpBitrateParameters( void RtpTransportControllerSend::SetSdpBitrateParameters(
@ -397,8 +397,7 @@ void RtpTransportControllerSend::SetSdpBitrateParameters(
task_queue_.PostTask([this, msg]() { task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) { if (controller_) {
control_handler_->PostUpdates( PostUpdates(controller_->OnTargetRateConstraints(msg));
controller_->OnTargetRateConstraints(msg));
} else { } else {
UpdateInitialConstraints(msg); UpdateInitialConstraints(msg);
} }
@ -419,8 +418,7 @@ void RtpTransportControllerSend::SetClientBitratePreferences(
task_queue_.PostTask([this, msg]() { task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) { if (controller_) {
control_handler_->PostUpdates( PostUpdates(controller_->OnTargetRateConstraints(msg));
controller_->OnTargetRateConstraints(msg));
} else { } else {
UpdateInitialConstraints(msg); UpdateInitialConstraints(msg);
} }
@ -470,7 +468,7 @@ void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
task_queue_.PostTask([this, msg]() { task_queue_.PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) if (controller_)
control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg)); PostUpdates(controller_->OnRemoteBitrateReport(msg));
}); });
} }
@ -490,7 +488,7 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
report.round_trip_time = TimeDelta::ms(rtt_ms); report.round_trip_time = TimeDelta::ms(rtt_ms);
report.smoothed = false; report.smoothed = false;
if (controller_) if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); PostUpdates(controller_->OnRoundTripTimeUpdate(report));
}); });
} }
@ -515,11 +513,11 @@ void RtpTransportControllerSend::OnTransportFeedback(
task_queue_.PostTask([this, feedback_msg]() { task_queue_.PostTask([this, feedback_msg]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) if (controller_)
control_handler_->PostUpdates( PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
controller_->OnTransportPacketsFeedback(*feedback_msg));
}); });
} }
MaybeUpdateOutstandingData(); pacer_.UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData().bytes());
} }
void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms, void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms,
@ -532,7 +530,7 @@ void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms,
task_queue_.PostTask([this, report]() { task_queue_.PostTask([this, report]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
if (controller_) if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report)); PostUpdates(controller_->OnRoundTripTimeUpdate(report));
}); });
} }
@ -542,7 +540,7 @@ void RtpTransportControllerSend::MaybeCreateControllers() {
if (!network_available_ || !observer_) if (!network_available_ || !observer_)
return; return;
control_handler_ = absl::make_unique<CongestionControlHandler>(this, &pacer_); control_handler_ = absl::make_unique<CongestionControlHandler>();
initial_config_.constraints.at_time = initial_config_.constraints.at_time =
Timestamp::ms(clock_->TimeInMilliseconds()); Timestamp::ms(clock_->TimeInMilliseconds());
@ -571,13 +569,14 @@ void RtpTransportControllerSend::UpdateInitialConstraints(
} }
void RtpTransportControllerSend::StartProcessPeriodicTasks() { void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!periodic_tasks_enabled_)
return;
if (!pacer_queue_update_task_) { if (!pacer_queue_update_task_) {
pacer_queue_update_task_ = pacer_queue_update_task_ =
StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() { StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
UpdatePacerQueue(); TimeDelta expected_queue_time =
TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
control_handler_->SetPacerQueue(expected_queue_time);
UpdateControlState();
}); });
} }
if (controller_task_) { if (controller_task_) {
@ -599,34 +598,37 @@ void RtpTransportControllerSend::StartProcessPeriodicTasks() {
} }
void RtpTransportControllerSend::UpdateControllerWithTimeInterval() { void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
if (controller_) { RTC_DCHECK(controller_);
ProcessInterval msg; ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
control_handler_->PostUpdates(controller_->OnProcessInterval(msg)); PostUpdates(controller_->OnProcessInterval(msg));
}
}
void RtpTransportControllerSend::UpdatePacerQueue() {
if (control_handler_) {
TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
control_handler_->OnPacerQueueUpdate(expected_queue_time);
}
}
void RtpTransportControllerSend::MaybeUpdateOutstandingData() {
DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
task_queue_.PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(&task_queue_);
if (control_handler_)
control_handler_->OnOutstandingData(in_flight_data);
});
} }
void RtpTransportControllerSend::UpdateStreamsConfig() { void RtpTransportControllerSend::UpdateStreamsConfig() {
streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
if (controller_) if (controller_)
control_handler_->PostUpdates( PostUpdates(controller_->OnStreamsConfig(streams_config_));
controller_->OnStreamsConfig(streams_config_)); }
void RtpTransportControllerSend::PostUpdates(NetworkControlUpdate update) {
if (update.congestion_window) {
if (update.congestion_window->IsFinite())
pacer_.SetCongestionWindow(update.congestion_window->bytes());
else
pacer_.SetCongestionWindow(PacedSender::kNoCongestionWindow);
}
if (update.pacer_config) {
pacer_.SetPacingRates(update.pacer_config->data_rate().bps(),
update.pacer_config->pad_rate().bps());
}
for (const auto& probe : update.probe_cluster_configs) {
int64_t bitrate_bps = probe.target_data_rate.bps();
pacer_.CreateProbeCluster(bitrate_bps);
}
if (update.target_rate) {
control_handler_->SetTargetRate(*update.target_rate);
UpdateControlState();
}
} }
void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks( void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
@ -669,7 +671,7 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
msg.start_time = last_report_block_time_; msg.start_time = last_report_block_time_;
msg.end_time = now; msg.end_time = now;
if (controller_) if (controller_)
control_handler_->PostUpdates(controller_->OnTransportLossReport(msg)); PostUpdates(controller_->OnTransportLossReport(msg));
last_report_block_time_ = now; last_report_block_time_ = now;
} }

View File

@ -21,7 +21,6 @@
#include "call/rtp_bitrate_configurator.h" #include "call/rtp_bitrate_configurator.h"
#include "call/rtp_transport_controller_send_interface.h" #include "call/rtp_transport_controller_send_interface.h"
#include "call/rtp_video_sender.h" #include "call/rtp_video_sender.h"
#include "modules/congestion_controller/include/network_changed_observer.h"
#include "modules/congestion_controller/rtp/control_handler.h" #include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h" #include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/pacing/packet_router.h" #include "modules/pacing/packet_router.h"
@ -41,7 +40,6 @@ class RtcEventLog;
// per transport, sharing the same congestion controller. // per transport, sharing the same congestion controller.
class RtpTransportControllerSend final class RtpTransportControllerSend final
: public RtpTransportControllerSendInterface, : public RtpTransportControllerSendInterface,
public NetworkChangedObserver,
public RtcpBandwidthObserver, public RtcpBandwidthObserver,
public CallStatsObserver, public CallStatsObserver,
public TransportFeedbackObserver { public TransportFeedbackObserver {
@ -68,12 +66,6 @@ class RtpTransportControllerSend final
void DestroyRtpVideoSender( void DestroyRtpVideoSender(
RtpVideoSenderInterface* rtp_video_sender) override; RtpVideoSenderInterface* rtp_video_sender) override;
// Implements NetworkChangedObserver interface.
void OnNetworkChanged(uint32_t bitrate_bps,
uint8_t fraction_loss,
int64_t rtt_ms,
int64_t probing_interval_ms) override;
// Implements RtpTransportControllerSendInterface // Implements RtpTransportControllerSendInterface
rtc::TaskQueue* GetWorkerQueue() override; rtc::TaskQueue* GetWorkerQueue() override;
PacketRouter* packet_router() override; PacketRouter* packet_router() override;
@ -139,13 +131,13 @@ class RtpTransportControllerSend final
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_); void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_); void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_); void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks, void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms) int64_t now_ms)
RTC_RUN_ON(task_queue_); RTC_RUN_ON(task_queue_);
void PostUpdates(NetworkControlUpdate update) RTC_RUN_ON(task_queue_);
void UpdateControlState() RTC_RUN_ON(task_queue_);
const Clock* const clock_; const Clock* const clock_;
PacketRouter packet_router_; PacketRouter packet_router_;
@ -188,7 +180,6 @@ class RtpTransportControllerSend final
// TODO(srte): Remove atomic when feedback adapter runs on task queue. // TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_; std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_); bool network_available_ RTC_GUARDED_BY(task_queue_);
bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_); bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_) PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_)
RTC_PT_GUARDED_BY(task_queue_); RTC_PT_GUARDED_BY(task_queue_);

View File

@ -47,15 +47,6 @@ class MockRtcpIntraFrameObserver : public RtcpIntraFrameObserver {
MOCK_METHOD1(OnReceivedIntraFrameRequest, void(uint32_t)); MOCK_METHOD1(OnReceivedIntraFrameRequest, void(uint32_t));
}; };
class MockCongestionObserver : public NetworkChangedObserver {
public:
MOCK_METHOD4(OnNetworkChanged,
void(uint32_t bitrate_bps,
uint8_t fraction_loss,
int64_t rtt_ms,
int64_t probing_interval_ms));
};
RtpSenderObservers CreateObservers( RtpSenderObservers CreateObservers(
RtcpRttStats* rtcp_rtt_stats, RtcpRttStats* rtcp_rtt_stats,
RtcpIntraFrameObserver* intra_frame_callback, RtcpIntraFrameObserver* intra_frame_callback,
@ -115,7 +106,6 @@ class RtpVideoSenderTestFixture {
private: private:
NiceMock<MockTransport> transport_; NiceMock<MockTransport> transport_;
NiceMock<MockCongestionObserver> congestion_observer_;
NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_; NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
SimulatedClock clock_; SimulatedClock clock_;
RtcEventLogNullImpl event_log_; RtcEventLogNullImpl event_log_;

View File

@ -34,93 +34,41 @@ bool IsPacerEmergencyStopDisabled() {
} }
} // namespace } // namespace
CongestionControlHandler::CongestionControlHandler( CongestionControlHandler::CongestionControlHandler()
NetworkChangedObserver* observer, : pacer_pushback_experiment_(IsPacerPushbackExperimentEnabled()),
PacedSender* pacer)
: observer_(observer),
pacer_(pacer),
pacer_pushback_experiment_(IsPacerPushbackExperimentEnabled()),
disable_pacer_emergency_stop_(IsPacerEmergencyStopDisabled()) { disable_pacer_emergency_stop_(IsPacerEmergencyStopDisabled()) {
sequenced_checker_.Detach(); sequenced_checker_.Detach();
} }
CongestionControlHandler::~CongestionControlHandler() {} CongestionControlHandler::~CongestionControlHandler() {}
void CongestionControlHandler::PostUpdates(NetworkControlUpdate update) { void CongestionControlHandler::SetTargetRate(
TargetTransferRate new_target_rate) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (update.congestion_window) { last_incoming_ = new_target_rate;
if (update.congestion_window->IsFinite())
pacer_->SetCongestionWindow(update.congestion_window->bytes());
else
pacer_->SetCongestionWindow(PacedSender::kNoCongestionWindow);
}
if (update.pacer_config) {
pacer_->SetPacingRates(update.pacer_config->data_rate().bps(),
update.pacer_config->pad_rate().bps());
}
for (const auto& probe : update.probe_cluster_configs) {
int64_t bitrate_bps = probe.target_data_rate.bps();
pacer_->CreateProbeCluster(bitrate_bps);
}
if (update.target_rate) {
current_target_rate_msg_ = *update.target_rate;
OnNetworkInvalidation();
}
} }
void CongestionControlHandler::OnNetworkAvailability(NetworkAvailability msg) { void CongestionControlHandler::SetNetworkAvailability(bool network_available) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (network_available_ != msg.network_available) { network_available_ = network_available;
network_available_ = msg.network_available;
pacer_->UpdateOutstandingData(0);
SetPacerState(!msg.network_available);
OnNetworkInvalidation();
}
} }
void CongestionControlHandler::OnOutstandingData(DataSize in_flight_data) { void CongestionControlHandler::SetPacerQueue(TimeDelta expected_queue_time) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
pacer_->UpdateOutstandingData(in_flight_data.bytes());
OnNetworkInvalidation();
}
void CongestionControlHandler::OnPacerQueueUpdate(
TimeDelta expected_queue_time) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_); RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
pacer_expected_queue_ms_ = expected_queue_time.ms(); pacer_expected_queue_ms_ = expected_queue_time.ms();
OnNetworkInvalidation();
} }
void CongestionControlHandler::SetPacerState(bool paused) { absl::optional<TargetTransferRate> CongestionControlHandler::GetUpdate() {
if (paused && !pacer_paused_) RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
pacer_->Pause(); if (!last_incoming_.has_value())
else if (!paused && pacer_paused_) return absl::nullopt;
pacer_->Resume(); TargetTransferRate new_outgoing = *last_incoming_;
pacer_paused_ = paused; DataRate log_target_rate = new_outgoing.target_rate;
} bool pause_encoding = false;
void CongestionControlHandler::OnNetworkInvalidation() {
if (!current_target_rate_msg_.has_value())
return;
uint32_t target_bitrate_bps = current_target_rate_msg_->target_rate.bps();
int64_t rtt_ms =
current_target_rate_msg_->network_estimate.round_trip_time.ms();
float loss_rate_ratio =
current_target_rate_msg_->network_estimate.loss_rate_ratio;
int loss_ratio_255 = loss_rate_ratio * 255;
uint8_t fraction_loss =
rtc::dchecked_cast<uint8_t>(rtc::SafeClamp(loss_ratio_255, 0, 255));
int64_t probing_interval_ms =
current_target_rate_msg_->network_estimate.bwe_period.ms();
if (!network_available_) { if (!network_available_) {
target_bitrate_bps = 0; pause_encoding = true;
} else if (pacer_pushback_experiment_) { } else if (pacer_pushback_experiment_) {
int64_t queue_length_ms = pacer_expected_queue_ms_; const int64_t queue_length_ms = pacer_expected_queue_ms_;
if (queue_length_ms == 0) { if (queue_length_ms == 0) {
encoding_rate_ratio_ = 1.0; encoding_rate_ratio_ = 1.0;
} else if (queue_length_ms > 50) { } else if (queue_length_ms > 50) {
@ -128,46 +76,31 @@ void CongestionControlHandler::OnNetworkInvalidation() {
encoding_rate_ratio_ = std::min(encoding_rate_ratio_, encoding_ratio); encoding_rate_ratio_ = std::min(encoding_rate_ratio_, encoding_ratio);
encoding_rate_ratio_ = std::max(encoding_rate_ratio_, 0.0); encoding_rate_ratio_ = std::max(encoding_rate_ratio_, 0.0);
} }
new_outgoing.target_rate = new_outgoing.target_rate * encoding_rate_ratio_;
target_bitrate_bps *= encoding_rate_ratio_; log_target_rate = new_outgoing.target_rate;
target_bitrate_bps = target_bitrate_bps < 50000 ? 0 : target_bitrate_bps; if (new_outgoing.target_rate < DataRate::kbps(50))
} else if (!disable_pacer_emergency_stop_) { pause_encoding = true;
target_bitrate_bps = IsSendQueueFull() ? 0 : target_bitrate_bps; } else if (!disable_pacer_emergency_stop_ &&
pacer_expected_queue_ms_ > PacedSender::kMaxQueueLengthMs) {
pause_encoding = true;
} }
if (pause_encoding)
if (HasNetworkParametersToReportChanged(target_bitrate_bps, fraction_loss, new_outgoing.target_rate = DataRate::Zero();
rtt_ms)) { if (!last_reported_ ||
observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms, last_reported_->target_rate != new_outgoing.target_rate ||
probing_interval_ms); (!new_outgoing.target_rate.IsZero() &&
(last_reported_->network_estimate.loss_rate_ratio !=
new_outgoing.network_estimate.loss_rate_ratio ||
last_reported_->network_estimate.round_trip_time !=
new_outgoing.network_estimate.round_trip_time))) {
if (encoder_paused_in_last_report_ != pause_encoding)
RTC_LOG(LS_INFO) << "Bitrate estimate state changed, BWE: "
<< ToString(log_target_rate) << ".";
encoder_paused_in_last_report_ = pause_encoding;
last_reported_ = new_outgoing;
return new_outgoing;
} }
} return absl::nullopt;
bool CongestionControlHandler::HasNetworkParametersToReportChanged(
int64_t target_bitrate_bps,
uint8_t fraction_loss,
int64_t rtt_ms) {
bool changed = last_reported_target_bitrate_bps_ != target_bitrate_bps ||
(target_bitrate_bps > 0 &&
(last_reported_fraction_loss_ != fraction_loss ||
last_reported_rtt_ms_ != rtt_ms));
if (changed &&
(last_reported_target_bitrate_bps_ == 0 || target_bitrate_bps == 0)) {
RTC_LOG(LS_INFO) << "Bitrate estimate state changed, BWE: "
<< target_bitrate_bps << " bps.";
}
last_reported_target_bitrate_bps_ = target_bitrate_bps;
last_reported_fraction_loss_ = fraction_loss;
last_reported_rtt_ms_ = rtt_ms;
return changed;
}
bool CongestionControlHandler::IsSendQueueFull() const {
return pacer_expected_queue_ms_ > PacedSender::kMaxQueueLengthMs;
}
absl::optional<TargetTransferRate>
CongestionControlHandler::last_transfer_rate() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
return current_target_rate_msg_;
} }
} // namespace webrtc } // namespace webrtc

View File

@ -17,7 +17,6 @@
#include "api/transport/network_types.h" #include "api/transport/network_types.h"
#include "api/units/data_size.h" #include "api/units/data_size.h"
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
#include "modules/congestion_controller/include/network_changed_observer.h"
#include "modules/pacing/paced_sender.h" #include "modules/pacing/paced_sender.h"
#include "rtc_base/constructormagic.h" #include "rtc_base/constructormagic.h"
#include "rtc_base/sequenced_task_checker.h" #include "rtc_base/sequenced_task_checker.h"
@ -30,46 +29,27 @@ namespace webrtc {
// destruction unless members are properly ordered. // destruction unless members are properly ordered.
class CongestionControlHandler { class CongestionControlHandler {
public: public:
CongestionControlHandler(NetworkChangedObserver* observer, CongestionControlHandler();
PacedSender* pacer);
~CongestionControlHandler(); ~CongestionControlHandler();
void PostUpdates(NetworkControlUpdate update); void SetTargetRate(TargetTransferRate new_target_rate);
void SetNetworkAvailability(bool network_available);
void OnNetworkAvailability(NetworkAvailability msg); void SetPacerQueue(TimeDelta expected_queue_time);
void OnOutstandingData(DataSize in_flight_data); absl::optional<TargetTransferRate> GetUpdate();
void OnPacerQueueUpdate(TimeDelta expected_queue_time);
absl::optional<TargetTransferRate> last_transfer_rate();
private: private:
void SetPacerState(bool paused); absl::optional<TargetTransferRate> last_incoming_;
void OnNetworkInvalidation(); absl::optional<TargetTransferRate> last_reported_;
bool IsSendQueueFull() const;
bool HasNetworkParametersToReportChanged(int64_t bitrate_bps,
float loss_rate_ratio,
TimeDelta rtt);
bool HasNetworkParametersToReportChanged(int64_t bitrate_bps,
uint8_t fraction_loss,
int64_t rtt);
NetworkChangedObserver* observer_ = nullptr;
PacedSender* const pacer_;
absl::optional<TargetTransferRate> current_target_rate_msg_;
bool network_available_ = true; bool network_available_ = true;
bool pacer_paused_ = false; bool encoder_paused_in_last_report_ = false;
int64_t last_reported_target_bitrate_bps_ = 0;
uint8_t last_reported_fraction_loss_ = 0;
int64_t last_reported_rtt_ms_ = 0;
const bool pacer_pushback_experiment_; const bool pacer_pushback_experiment_;
const bool disable_pacer_emergency_stop_; const bool disable_pacer_emergency_stop_;
uint32_t min_pushback_target_bitrate_bps_;
int64_t pacer_expected_queue_ms_ = 0; int64_t pacer_expected_queue_ms_ = 0;
double encoding_rate_ratio_ = 1.0; double encoding_rate_ratio_ = 1.0;
rtc::SequencedTaskChecker sequenced_checker_; rtc::SequencedTaskChecker sequenced_checker_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(CongestionControlHandler); RTC_DISALLOW_COPY_AND_ASSIGN(CongestionControlHandler);
}; };
} // namespace webrtc } // namespace webrtc
#endif // MODULES_CONGESTION_CONTROLLER_RTP_CONTROL_HANDLER_H_ #endif // MODULES_CONGESTION_CONTROLLER_RTP_CONTROL_HANDLER_H_