Using shared task queue for congestion controller.

This simplifies the code and removes the need for a lot of bookkeeping
variables.

Bug: webrtc:9232
Change-Id: I0c9a4b0741ed5353caa22ba5acdcb166357441f2
Reviewed-on: https://webrtc-review.googlesource.com/74240
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23149}
This commit is contained in:
Sebastian Jansson
2018-05-07 16:33:50 +02:00
committed by Commit Bot
parent 0c4f7beb25
commit bd9fe092ce
5 changed files with 67 additions and 91 deletions

View File

@ -29,6 +29,7 @@ bool TaskQueueExperimentEnabled() {
std::unique_ptr<SendSideCongestionControllerInterface> CreateController(
Clock* clock,
rtc::TaskQueue* task_queue,
webrtc::RtcEventLog* event_log,
PacedSender* pacer,
const BitrateConstraints& bitrate_config,
@ -36,7 +37,7 @@ std::unique_ptr<SendSideCongestionControllerInterface> CreateController(
if (task_queue_controller) {
RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC";
return rtc::MakeUnique<webrtc::webrtc_cc::SendSideCongestionController>(
clock, event_log, pacer, bitrate_config.start_bitrate_bps,
clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps,
bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps);
}
RTC_LOG(LS_INFO) << "Using Legacy SSCC";
@ -59,13 +60,12 @@ RtpTransportControllerSend::RtpTransportControllerSend(
bitrate_configurator_(bitrate_config),
process_thread_(ProcessThread::Create("SendControllerThread")),
observer_(nullptr),
send_side_cc_(CreateController(clock,
event_log,
&pacer_,
bitrate_config,
TaskQueueExperimentEnabled())),
task_queue_("rtp_send_controller") {
send_side_cc_ptr_ = send_side_cc_.get();
// Created after task_queue to be able to post to the task queue internally.
send_side_cc_ =
CreateController(clock, &task_queue_, event_log, &pacer_, bitrate_config,
TaskQueueExperimentEnabled());
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
process_thread_->Start();
@ -89,7 +89,7 @@ void RtpTransportControllerSend::OnNetworkChanged(uint32_t bitrate_bps,
msg.network_estimate.at_time = msg.at_time;
msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms);
uint32_t bandwidth_bps;
if (send_side_cc_ptr_->AvailableBandwidth(&bandwidth_bps))
if (send_side_cc_->AvailableBandwidth(&bandwidth_bps))
msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps);
msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0;
msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms);

View File

@ -93,17 +93,7 @@ class RtpTransportControllerSend final
const std::unique_ptr<ProcessThread> process_thread_;
rtc::CriticalSection observer_crit_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_);
// Caches send_side_cc_.get(), to avoid racing with destructor.
// Note that this is declared before send_side_cc_ to ensure that it is not
// invalidated until no more tasks can be running on the send_side_cc_ task
// queue.
// TODO(srte): Remove this when only the task queue based send side congestion
// controller is used and it is no longer accessed synchronously in the
// OnNetworkChanged callback.
SendSideCongestionControllerInterface* send_side_cc_ptr_;
// Declared last since it will issue callbacks from a task queue. Declaring it
// last ensures that it is destroyed first.
const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.
// |task_queue_| is defined last to ensure all pending tasks are cancelled
// and deleted before any other members.

View File

@ -66,6 +66,7 @@ class SendSideCongestionController
public RtcpBandwidthObserver {
public:
SendSideCongestionController(const Clock* clock,
rtc::TaskQueue* task_queue,
RtcEventLog* event_log,
PacedSender* pacer,
int start_bitrate_bps,
@ -150,18 +151,18 @@ class SendSideCongestionController
void WaitOnTasksForTest();
private:
void MaybeCreateControllers() RTC_RUN_ON(task_queue_ptr_);
void MaybeRecreateControllers() RTC_RUN_ON(task_queue_ptr_);
void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
void MaybeRecreateControllers() RTC_RUN_ON(task_queue_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_ptr_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_ptr_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_ptr_);
void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_ptr_);
void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
void MaybeUpdateOutstandingData();
void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
int64_t now_ms)
RTC_RUN_ON(task_queue_ptr_);
RTC_RUN_ON(task_queue_);
const Clock* const clock_;
// PacedSender is thread safe and doesn't need protection here.
@ -170,57 +171,47 @@ class SendSideCongestionController
TransportFeedbackAdapter transport_feedback_adapter_;
const std::unique_ptr<NetworkControllerFactoryInterface>
controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_ptr_);
controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_);
const std::unique_ptr<NetworkControllerFactoryInterface>
controller_factory_fallback_ RTC_GUARDED_BY(task_queue_ptr_);
controller_factory_fallback_ RTC_GUARDED_BY(task_queue_);
const std::unique_ptr<PacerController> pacer_controller_
RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
std::unique_ptr<send_side_cc_internal::ControlHandler> control_handler_
RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
std::unique_ptr<NetworkControllerInterface> controller_
RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_ptr_);
TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
std::map<uint32_t, RTCPReportBlock> last_report_blocks_
RTC_GUARDED_BY(task_queue_ptr_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_ptr_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_ptr_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_ptr_);
NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_);
NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
const bool send_side_bwe_with_overhead_;
// Transport overhead is written by OnNetworkRouteChanged and read by
// AddPacket.
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_ptr_);
bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_ptr_);
bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_ptr_);
bool network_available_ RTC_GUARDED_BY(task_queue_);
bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
send_side_cc_internal::PeriodicTask* pacer_queue_update_task_
RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
send_side_cc_internal::PeriodicTask* controller_task_
RTC_GUARDED_BY(task_queue_ptr_);
RTC_GUARDED_BY(task_queue_);
// Protects access to last_packet_feedback_vector_ in feedback adapter.
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
rtc::RaceChecker worker_race_;
// Caches task_queue_.get(), to avoid racing with destructor.
// Note that this is declared before task_queue_ to ensure that it is not
// invalidated until no more tasks can be running on the task queue.
rtc::TaskQueue* task_queue_ptr_;
// Note that moving ownership of the task queue makes it neccessary to make
// sure that there is no outstanding tasks on it using destructed objects.
// This is currently guranteed by using explicit reset in the destructor of
// this class. It is declared last to indicate that it's lifetime is shorter
// than all other members.
std::unique_ptr<rtc::TaskQueue> task_queue_;
rtc::TaskQueue* task_queue_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController);
};

View File

@ -305,6 +305,7 @@ rtc::Optional<TargetTransferRate> ControlHandler::last_transfer_rate() {
SendSideCongestionController::SendSideCongestionController(
const Clock* clock,
rtc::TaskQueue* task_queue,
RtcEventLog* event_log,
PacedSender* pacer,
int start_bitrate_bps,
@ -328,8 +329,7 @@ SendSideCongestionController::SendSideCongestionController(
packet_feedback_available_(false),
pacer_queue_update_task_(nullptr),
controller_task_(nullptr),
task_queue_(MakeUnique<rtc::TaskQueue>("SendSideCCQueue")) {
task_queue_ptr_ = task_queue_.get();
task_queue_(task_queue) {
initial_config_.constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
RTC_DCHECK(start_bitrate_bps > 0);
@ -381,13 +381,7 @@ void SendSideCongestionController::MaybeRecreateControllers() {
RTC_DCHECK(controller_);
}
SendSideCongestionController::~SendSideCongestionController() {
// Must be destructed before any objects used by calls on the task queue.
task_queue_.reset();
// Singe the task queue has been destructed, it is now safe to reset
// task_queue_raw_ which is only used by tasks on the task queue.
task_queue_ptr_ = nullptr;
}
SendSideCongestionController::~SendSideCongestionController() = default;
void SendSideCongestionController::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
@ -402,7 +396,7 @@ void SendSideCongestionController::DeRegisterPacketFeedbackObserver(
void SendSideCongestionController::RegisterNetworkObserver(
NetworkChangedObserver* observer) {
task_queue_->PostTask([this, observer]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
MaybeCreateControllers();
@ -415,7 +409,7 @@ void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps,
TargetRateConstraints constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
task_queue_->PostTask([this, constraints, start_bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(
controller_->OnTargetRateConstraints(constraints));
@ -433,7 +427,7 @@ void SendSideCongestionController::SetAllocatedSendBitrateLimits(
int64_t max_total_bitrate_bps) {
task_queue_->PostTask([this, min_send_bitrate_bps, max_padding_bitrate_bps,
max_total_bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
streams_config_.max_total_allocated_bitrate =
@ -460,7 +454,7 @@ void SendSideCongestionController::OnNetworkRouteChanged(
if (start_bitrate_bps > 0)
msg.starting_rate = DataRate::bps(start_bitrate_bps);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
} else {
@ -479,7 +473,7 @@ bool SendSideCongestionController::AvailableBandwidth(
// running on the task queue.
// TODO(srte): Remove this function when RtpTransportControllerSend stops
// calling it.
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (!control_handler_) {
return false;
}
@ -500,7 +494,7 @@ RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() {
void SendSideCongestionController::SetPerPacketFeedbackAvailable(
bool available) {
task_queue_->PostTask([this, available]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
packet_feedback_available_ = available;
MaybeRecreateControllers();
});
@ -508,7 +502,7 @@ void SendSideCongestionController::SetPerPacketFeedbackAvailable(
void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
task_queue_->PostTask([this, enable]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
});
@ -533,7 +527,7 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) {
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.network_available = state == kNetworkUp;
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
network_available_ = msg.network_available;
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
@ -560,7 +554,7 @@ void SendSideCongestionController::OnSentPacket(
msg.size = DataSize::bytes(packet->payload_size);
msg.send_time = Timestamp::ms(packet->send_time_ms);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(msg));
});
@ -575,7 +569,7 @@ void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
report.smoothed = true;
task_queue_->PostTask([this, report]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
@ -594,9 +588,9 @@ void SendSideCongestionController::StartProcessPeriodicTasks() {
if (!periodic_tasks_enabled_)
return;
if (!pacer_queue_update_task_) {
pacer_queue_update_task_ = StartPeriodicTask(
task_queue_ptr_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
pacer_queue_update_task_ =
StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdatePacerQueue();
});
}
@ -611,8 +605,8 @@ void SendSideCongestionController::StartProcessPeriodicTasks() {
// queue is destroyed or some time after Stop() is called, whichever comes
// first.
controller_task_ =
StartPeriodicTask(task_queue_ptr_, process_interval_.ms(), [this]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
});
}
@ -668,7 +662,7 @@ void SendSideCongestionController::OnTransportFeedback(
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(
controller_->OnTransportPacketsFeedback(msg));
@ -680,7 +674,7 @@ void SendSideCongestionController::MaybeUpdateOutstandingData() {
DataSize in_flight_data =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
pacer_controller_->OnOutstandingData(in_flight_data);
});
}
@ -693,7 +687,7 @@ SendSideCongestionController::GetTransportFeedbackVector() const {
void SendSideCongestionController::PostPeriodicTasksForTest() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
UpdatePacerQueue();
});
@ -707,7 +701,7 @@ void SendSideCongestionController::WaitOnTasksForTest() {
void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
task_queue_->PostTask([this, pacing_factor]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig();
});
@ -715,7 +709,7 @@ void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
void SendSideCongestionController::DisablePeriodicTasks() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
periodic_tasks_enabled_ = false;
});
}
@ -726,7 +720,7 @@ void SendSideCongestionController::OnReceivedEstimatedBitrate(
msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::bps(bitrate);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
});
@ -737,12 +731,12 @@ void SendSideCongestionController::OnReceivedRtcpReceiverReport(
int64_t rtt_ms,
int64_t now_ms) {
task_queue_->PostTask([this, report_blocks, now_ms]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_->PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(task_queue_ptr_);
RTC_DCHECK_RUN_ON(task_queue_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(rtt_ms);

View File

@ -74,10 +74,10 @@ class SendSideCongestionControllerTest : public ::testing::Test {
SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3));
EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5));
task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test");
controller_.reset(new SendSideCongestionControllerForTest(
&clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0,
5 * kInitialBitrateBps));
&clock_, task_queue_.get(), &event_log_, pacer_.get(),
kInitialBitrateBps, 0, 5 * kInitialBitrateBps));
controller_->DisablePeriodicTasks();
controller_->RegisterNetworkObserver(&observer_);
controller_->SignalNetworkState(NetworkState::kNetworkUp);
@ -94,9 +94,10 @@ class SendSideCongestionControllerTest : public ::testing::Test {
void TargetBitrateTrackingSetup() {
bandwidth_observer_ = nullptr;
pacer_.reset(new NiceMock<MockPacedSender>());
task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test");
controller_.reset(new SendSideCongestionControllerForTest(
&clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0,
5 * kInitialBitrateBps));
&clock_, task_queue_.get(), &event_log_, pacer_.get(),
kInitialBitrateBps, 0, 5 * kInitialBitrateBps));
controller_->DisablePeriodicTasks();
controller_->RegisterNetworkObserver(&target_bitrate_observer_);
controller_->SignalNetworkState(NetworkState::kNetworkUp);
@ -166,8 +167,8 @@ class SendSideCongestionControllerTest : public ::testing::Test {
PacketRouter packet_router_;
std::unique_ptr<NiceMock<MockPacedSender>> pacer_;
std::unique_ptr<SendSideCongestionControllerForTest> controller_;
rtc::Optional<uint32_t> target_bitrate_bps_;
std::unique_ptr<rtc::TaskQueue> task_queue_;
};
TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) {