From 62d01cde6f6ec1fa91b1e5234a7922ad1a4ce036 Mon Sep 17 00:00:00 2001 From: Sebastian Jansson Date: Fri, 6 Dec 2019 17:35:02 +0100 Subject: [PATCH] Moves TransportFeedbackAdapter to TaskQueue. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: webrtc:9883 Change-Id: Id87e281751d98043f4470df5a71d458f4cd654c1 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158793 Commit-Queue: Sebastian Jansson Reviewed-by: Erik Språng Cr-Commit-Position: refs/heads/master@{#30037} --- call/rtp_transport_controller_send.cc | 73 +++++++-------- call/rtp_transport_controller_send.h | 12 ++- call/rtp_video_sender_unittest.cc | 38 +++++--- .../bbr/bbr_network_controller_unittest.cc | 4 +- .../rtp/transport_feedback_adapter.cc | 89 ++++++++----------- .../rtp/transport_feedback_adapter.h | 29 +++--- 6 files changed, 122 insertions(+), 123 deletions(-) diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc index bd8e2d0303..986fa09243 100644 --- a/call/rtp_transport_controller_send.cc +++ b/call/rtp_transport_controller_send.cc @@ -22,6 +22,7 @@ #include "call/rtp_video_sender.h" #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h" #include "logging/rtc_event_log/events/rtc_event_route_change.h" +#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" #include "rtc_base/rate_limiter.h" @@ -228,6 +229,7 @@ void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) { } StreamFeedbackProvider* RtpTransportControllerSend::GetStreamFeedbackProvider() { + RTC_DCHECK_RUN_ON(&task_queue_); return &transport_feedback_adapter_; } @@ -278,11 +280,6 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( << " bps."; RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); - if (reset_feedback_on_route_change_) - transport_feedback_adapter_.SetNetworkIds( - network_route.local_network_id, network_route.remote_network_id); - transport_overhead_bytes_per_packet_ = network_route.packet_overhead; - if (event_log_) { event_log_->Log(std::make_unique( network_route.connected, network_route.packet_overhead)); @@ -290,8 +287,13 @@ void RtpTransportControllerSend::OnNetworkRouteChanged( NetworkRouteChange msg; msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.constraints = ConvertConstraints(bitrate_config, clock_); - task_queue_.PostTask([this, msg] { + task_queue_.PostTask([this, msg, network_route] { RTC_DCHECK_RUN_ON(&task_queue_); + transport_overhead_bytes_per_packet_ = network_route.packet_overhead; + if (reset_feedback_on_route_change_) { + transport_feedback_adapter_.SetNetworkIds( + network_route.local_network_id, network_route.remote_network_id); + } if (controller_) { PostUpdates(controller_->OnNetworkRouteChange(msg)); } else { @@ -351,17 +353,15 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) { } void RtpTransportControllerSend::OnSentPacket( const rtc::SentPacket& sent_packet) { - absl::optional packet_msg = - transport_feedback_adapter_.ProcessSentPacket(sent_packet); - if (packet_msg) { - task_queue_.PostTask([this, packet_msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnSentPacket(*packet_msg)); - }); - } - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + task_queue_.PostTask([this, sent_packet]() { + RTC_DCHECK_RUN_ON(&task_queue_); + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); + if (packet_msg && controller_) + PostUpdates(controller_->OnSentPacket(*packet_msg)); + }); } void RtpTransportControllerSend::OnReceivedPacket( @@ -468,29 +468,30 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport( void RtpTransportControllerSend::OnAddPacket( const RtpPacketSendInfo& packet_info) { - transport_feedback_adapter_.AddPacket( - packet_info, - send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load() - : 0, - Timestamp::ms(clock_->TimeInMilliseconds())); + auto creation_time = Timestamp::ms(clock_->TimeInMilliseconds()); + task_queue_.PostTask([this, packet_info, creation_time]() { + RTC_DCHECK_RUN_ON(&task_queue_); + transport_feedback_adapter_.AddPacket( + packet_info, + send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0, + creation_time); + }); } void RtpTransportControllerSend::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { - RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); - - absl::optional feedback_msg = - transport_feedback_adapter_.ProcessTransportFeedback( - feedback, Timestamp::ms(clock_->TimeInMilliseconds())); - if (feedback_msg) { - task_queue_.PostTask([this, feedback_msg]() { - RTC_DCHECK_RUN_ON(&task_queue_); - if (controller_) - PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); - }); - } - pacer()->UpdateOutstandingData( - transport_feedback_adapter_.GetOutstandingData()); + auto feedback_time = Timestamp::ms(clock_->TimeInMilliseconds()); + task_queue_.PostTask([this, feedback, feedback_time]() { + RTC_DCHECK_RUN_ON(&task_queue_); + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback(feedback, + feedback_time); + if (feedback_msg && controller_) { + PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); + } + pacer()->UpdateOutstandingData( + transport_feedback_adapter_.GetOutstandingData()); + }); } void RtpTransportControllerSend::OnRemoteNetworkEstimate( diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h index b5a53d7fe7..00b4c63be6 100644 --- a/call/rtp_transport_controller_send.h +++ b/call/rtp_transport_controller_send.h @@ -151,7 +151,8 @@ class RtpTransportControllerSend final TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); // TODO(srte): Move all access to feedback adapter to task queue. - TransportFeedbackAdapter transport_feedback_adapter_; + TransportFeedbackAdapter transport_feedback_adapter_ + RTC_GUARDED_BY(task_queue_); NetworkControllerFactoryInterface* const controller_factory_override_ RTC_PT_GUARDED_BY(task_queue_); @@ -176,16 +177,13 @@ class RtpTransportControllerSend final const bool reset_feedback_on_route_change_; const bool send_side_bwe_with_overhead_; const bool add_pacing_to_cwin_; - // Transport overhead is written by OnNetworkRouteChanged and read by - // AddPacket. - // TODO(srte): Remove atomic when feedback adapter runs on task queue. - std::atomic transport_overhead_bytes_per_packet_; + + size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_); bool network_available_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); - // TODO(srte): Remove this checker when feedback adapter runs on task queue. - rtc::RaceChecker worker_race_; + // Protected by internal locks. RateLimiter retransmission_rate_limiter_; // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc index 8190eea5f3..94d0931314 100644 --- a/call/rtp_video_sender_unittest.cc +++ b/call/rtp_video_sender_unittest.cc @@ -135,17 +135,24 @@ class RtpVideoSenderTestFixture { VideoEncoderConfig::ContentType::kRealtimeVideo), retransmission_rate_limiter_(time_controller_.GetClock(), kRetransmitWindowSizeMs) { - std::map suspended_ssrcs; - router_ = std::make_unique( - time_controller_.GetClock(), suspended_ssrcs, suspended_payload_states, - config_.rtp, config_.rtcp_report_interval_ms, &transport_, - CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_, - &stats_proxy_, &stats_proxy_, &stats_proxy_, - frame_count_observer, &stats_proxy_, &stats_proxy_, - &send_delay_stats_), - &transport_controller_, &event_log_, &retransmission_rate_limiter_, - std::make_unique(time_controller_.GetClock()), - nullptr, CryptoOptions{}); + rtc::Event done; + transport_controller_.GetWorkerQueue()->PostTask([&]() { + std::map suspended_ssrcs; + + router_ = std::make_unique( + time_controller_.GetClock(), suspended_ssrcs, + suspended_payload_states, config_.rtp, + config_.rtcp_report_interval_ms, &transport_, + CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_, + &stats_proxy_, &stats_proxy_, &stats_proxy_, + frame_count_observer, &stats_proxy_, &stats_proxy_, + &send_delay_stats_), + &transport_controller_, &event_log_, &retransmission_rate_limiter_, + std::make_unique(time_controller_.GetClock()), + nullptr, CryptoOptions{}); + done.Set(); + }); + done.Wait(rtc::Event::kForever); } RtpVideoSenderTestFixture( const std::vector& ssrcs, @@ -157,7 +164,14 @@ class RtpVideoSenderTestFixture { payload_type, suspended_payload_states, /*frame_count_observer=*/nullptr) {} - + ~RtpVideoSenderTestFixture() { + rtc::Event done; + transport_controller_.GetWorkerQueue()->PostTask([&]() { + router_.reset(); + done.Set(); + }); + done.Wait(rtc::Event::kForever); + } RtpVideoSender* router() { return router_.get(); } MockTransport& transport() { return transport_; } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } diff --git a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc index 2a8a224a81..8cf4d17a9f 100644 --- a/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc +++ b/modules/congestion_controller/bbr/bbr_network_controller_unittest.cc @@ -155,8 +155,8 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) { ret_net->UpdateConfig( [](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); }); - s.RunFor(TimeDelta::seconds(40)); - EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40); + s.RunFor(TimeDelta::seconds(35)); + EXPECT_NEAR(client->send_bandwidth().kbps(), 180, 50); } } // namespace test diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index b070b0e23a..df52ef1b2a 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -73,7 +73,6 @@ TransportFeedbackAdapter::~TransportFeedbackAdapter() { void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( std::vector ssrcs, StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); RTC_DCHECK(observer); RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { return pair.second == observer; @@ -83,7 +82,6 @@ void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( StreamFeedbackObserver* observer) { - rtc::CritScope cs(&observers_lock_); RTC_DCHECK(observer); const auto it = absl::c_find_if( observers_, [=](const auto& pair) { return pair.second == observer; }); @@ -94,35 +92,31 @@ void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, size_t overhead_bytes, Timestamp creation_time) { - { - rtc::CritScope cs(&lock_); - PacketFeedback packet; - packet.creation_time = creation_time; - packet.sent.sequence_number = - seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); - packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); - packet.local_net_id = local_net_id_; - packet.remote_net_id = remote_net_id_; - packet.sent.pacing_info = packet_info.pacing_info; - if (packet_info.has_rtp_sequence_number) { - packet.ssrc = packet_info.ssrc; - packet.rtp_sequence_number = packet_info.rtp_sequence_number; - } - - while (!history_.empty() && - creation_time - history_.begin()->second.creation_time > - kSendTimeHistoryWindow) { - // TODO(sprang): Warn if erasing (too many) old items? - if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) - in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); - history_.erase(history_.begin()); - } - history_.insert(std::make_pair(packet.sent.sequence_number, packet)); + PacketFeedback packet; + packet.creation_time = creation_time; + packet.sent.sequence_number = + seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); + packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); + packet.local_net_id = local_net_id_; + packet.remote_net_id = remote_net_id_; + packet.sent.pacing_info = packet_info.pacing_info; + if (packet_info.has_rtp_sequence_number) { + packet.ssrc = packet_info.ssrc; + packet.rtp_sequence_number = packet_info.rtp_sequence_number; } + + while (!history_.empty() && + creation_time - history_.begin()->second.creation_time > + kSendTimeHistoryWindow) { + // TODO(sprang): Warn if erasing (too many) old items? + if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_) + in_flight_.RemoveInFlightPacketBytes(history_.begin()->second); + history_.erase(history_.begin()); + } + history_.insert(std::make_pair(packet.sent.sequence_number, packet)); } absl::optional TransportFeedbackAdapter::ProcessSentPacket( const rtc::SentPacket& sent_packet) { - rtc::CritScope cs(&lock_); auto send_time = Timestamp::ms(sent_packet.send_time_ms); // TODO(srte): Only use one way to indicate that packet feedback is used. if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { @@ -171,41 +165,37 @@ TransportFeedbackAdapter::ProcessTransportFeedback( std::vector feedback_vector; TransportPacketsFeedback msg; msg.feedback_time = feedback_receive_time; - { - rtc::CritScope cs(&lock_); - msg.prior_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); - feedback_vector = - ProcessTransportFeedbackInner(feedback, feedback_receive_time); - if (feedback_vector.empty()) - return absl::nullopt; + msg.prior_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + feedback_vector = + ProcessTransportFeedbackInner(feedback, feedback_receive_time); + if (feedback_vector.empty()) + return absl::nullopt; - for (const PacketFeedback& fb : feedback_vector) { - PacketResult res; - res.sent_packet = fb.sent; - res.receive_time = fb.receive_time; - msg.packet_feedbacks.push_back(res); - } - auto it = history_.find(last_ack_seq_num_); - if (it != history_.end()) { - msg.first_unacked_send_time = it->second.sent.send_time; - } - msg.data_in_flight = - in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + for (const PacketFeedback& fb : feedback_vector) { + PacketResult res; + res.sent_packet = fb.sent; + res.receive_time = fb.receive_time; + msg.packet_feedbacks.push_back(res); } + auto it = history_.find(last_ack_seq_num_); + if (it != history_.end()) { + msg.first_unacked_send_time = it->second.sent.send_time; + } + msg.data_in_flight = + in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); + SignalObservers(feedback_vector); return msg; } void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, uint16_t remote_id) { - rtc::CritScope cs(&lock_); local_net_id_ = local_id; remote_net_id_ = remote_id; } DataSize TransportFeedbackAdapter::GetOutstandingData() const { - rtc::CritScope cs(&lock_); return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); } @@ -290,7 +280,6 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner( void TransportFeedbackAdapter::SignalObservers( const std::vector& feedback_vector) { - rtc::CritScope cs(&observers_lock_); for (auto& observer : observers_) { std::vector selected_feedback; for (const auto& packet : feedback_vector) { diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index 699c6ed489..392e15c8fa 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -87,36 +87,33 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider { std::vector ProcessTransportFeedbackInner( const rtcp::TransportFeedback& feedback, - Timestamp feedback_time) RTC_RUN_ON(&lock_); + Timestamp feedback_time); void SignalObservers( const std::vector& packet_feedback_vector); - rtc::CriticalSection lock_; - DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); - Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); - Timestamp last_untracked_send_time_ RTC_GUARDED_BY(&lock_) = - Timestamp::MinusInfinity(); - SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_); - std::map history_ RTC_GUARDED_BY(&lock_); + DataSize pending_untracked_size_ = DataSize::Zero(); + Timestamp last_send_time_ = Timestamp::MinusInfinity(); + Timestamp last_untracked_send_time_ = Timestamp::MinusInfinity(); + SequenceNumberUnwrapper seq_num_unwrapper_; + std::map history_; // Sequence numbers are never negative, using -1 as it always < a real // sequence number. - int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1; - InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_); + int64_t last_ack_seq_num_ = -1; + InFlightBytesTracker in_flight_; - Timestamp current_offset_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); - TimeDelta last_timestamp_ RTC_GUARDED_BY(&lock_) = TimeDelta::MinusInfinity(); + Timestamp current_offset_ = Timestamp::MinusInfinity(); + TimeDelta last_timestamp_ = TimeDelta::MinusInfinity(); - uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; - uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; + uint16_t local_net_id_ = 0; + uint16_t remote_net_id_ = 0; - rtc::CriticalSection observers_lock_; // Maps a set of ssrcs to corresponding observer. Vectors are used rather than // set/map to ensure that the processing order is consistent independently of // the randomized ssrcs. std::vector, StreamFeedbackObserver*>> - observers_ RTC_GUARDED_BY(&observers_lock_); + observers_; }; } // namespace webrtc