diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn index b637faeaab..0ed79437a2 100644 --- a/modules/congestion_controller/rtp/BUILD.gn +++ b/modules/congestion_controller/rtp/BUILD.gn @@ -69,8 +69,10 @@ rtc_static_library("transport_feedback") { deps = [ "../..:module_api", "../../../api/transport:network_control", + "../../../api/units:data_size", "../../../rtc_base:checks", "../../../rtc_base:rtc_base_approved", + "../../../rtc_base/network:sent_packet", "../../../system_wrappers", "../../rtp_rtcp:rtp_rtcp_format", ] diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc index a962fd7b7b..771a831fd7 100644 --- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc +++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc @@ -48,41 +48,6 @@ bool IsPacerPushbackExperimentEnabled() { } -void SortPacketFeedbackVector(std::vector* input) { - std::sort(input->begin(), input->end(), PacketFeedbackComparator()); -} - -PacketResult NetworkPacketFeedbackFromRtpPacketFeedback( - const webrtc::PacketFeedback& pf) { - PacketResult feedback; - if (pf.arrival_time_ms == webrtc::PacketFeedback::kNotReceived) - feedback.receive_time = Timestamp::PlusInfinity(); - else - feedback.receive_time = Timestamp::ms(pf.arrival_time_ms); - if (pf.send_time_ms != webrtc::PacketFeedback::kNoSendTime) { - feedback.sent_packet = SentPacket(); - feedback.sent_packet->sequence_number = pf.long_sequence_number; - feedback.sent_packet->send_time = Timestamp::ms(pf.send_time_ms); - feedback.sent_packet->size = DataSize::bytes(pf.payload_size); - feedback.sent_packet->pacing_info = pf.pacing_info; - } - return feedback; -} - -std::vector PacketResultsFromRtpFeedbackVector( - const std::vector& feedback_vector) { - RTC_DCHECK(std::is_sorted(feedback_vector.begin(), feedback_vector.end(), - PacketFeedbackComparator())); - - std::vector packet_feedbacks; - packet_feedbacks.reserve(feedback_vector.size()); - for (const PacketFeedback& rtp_feedback : feedback_vector) { - auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback); - packet_feedbacks.push_back(feedback); - } - return packet_feedbacks; -} - TargetRateConstraints ConvertConstraints(int min_bitrate_bps, int max_bitrate_bps, int start_bitrate_bps, @@ -547,29 +512,16 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) { void SendSideCongestionController::OnSentPacket( const rtc::SentPacket& sent_packet) { - if (sent_packet.packet_id != -1) { - transport_feedback_adapter_.OnSentPacket(sent_packet.packet_id, - sent_packet.send_time_ms); - MaybeUpdateOutstandingData(); - auto packet = transport_feedback_adapter_.GetPacket(sent_packet.packet_id); - if (packet.has_value()) { - SentPacket msg; - msg.size = DataSize::bytes(packet->payload_size); - msg.send_time = Timestamp::ms(packet->send_time_ms); - msg.sequence_number = packet->long_sequence_number; - msg.prior_unacked_data = DataSize::bytes(packet->unacknowledged_data); - msg.data_in_flight = - DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask([this, msg]() { - RTC_DCHECK_RUN_ON(task_queue_); - if (controller_) - control_handler_->PostUpdates(controller_->OnSentPacket(msg)); - }); - } - } else if (sent_packet.info.included_in_allocation) { - transport_feedback_adapter_.AddUntracked(sent_packet.info.packet_size_bytes, - sent_packet.send_time_ms); + absl::optional packet_msg = + transport_feedback_adapter_.ProcessSentPacket(sent_packet); + if (packet_msg) { + task_queue_->PostTask([this, packet_msg]() { + RTC_DCHECK_RUN_ON(task_queue_); + if (controller_) + control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg)); + }); } + MaybeUpdateOutstandingData(); } void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms, @@ -654,36 +606,22 @@ void SendSideCongestionController::AddPacket( void SendSideCongestionController::OnTransportFeedback( const rtcp::TransportFeedback& feedback) { RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); - int64_t feedback_time_ms = clock_->TimeInMilliseconds(); - DataSize prior_in_flight = - DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - transport_feedback_adapter_.OnTransportFeedback(feedback); - MaybeUpdateOutstandingData(); - - std::vector feedback_vector = - transport_feedback_adapter_.GetTransportFeedbackVector(); - SortPacketFeedbackVector(&feedback_vector); - - if (!feedback_vector.empty()) { - TransportPacketsFeedback msg; - msg.packet_feedbacks = PacketResultsFromRtpFeedbackVector(feedback_vector); - msg.feedback_time = Timestamp::ms(feedback_time_ms); - msg.prior_in_flight = prior_in_flight; - msg.data_in_flight = - DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); - task_queue_->PostTask([this, msg]() { + absl::optional feedback_msg = + transport_feedback_adapter_.ProcessTransportFeedback(feedback); + if (feedback_msg) { + task_queue_->PostTask([this, feedback_msg]() { RTC_DCHECK_RUN_ON(task_queue_); if (controller_) control_handler_->PostUpdates( - controller_->OnTransportPacketsFeedback(msg)); + controller_->OnTransportPacketsFeedback(*feedback_msg)); }); } + MaybeUpdateOutstandingData(); } void SendSideCongestionController::MaybeUpdateOutstandingData() { - DataSize in_flight_data = - DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes()); + DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData(); task_queue_->PostTask([this, in_flight_data]() { RTC_DCHECK_RUN_ON(task_queue_); pacer_controller_->OnOutstandingData(in_flight_data); diff --git a/modules/congestion_controller/rtp/send_time_history.cc b/modules/congestion_controller/rtp/send_time_history.cc index 205f86a420..110dac3993 100644 --- a/modules/congestion_controller/rtp/send_time_history.cc +++ b/modules/congestion_controller/rtp/send_time_history.cc @@ -111,13 +111,13 @@ bool SendTimeHistory::GetFeedback(PacketFeedback* packet_feedback, return true; } -size_t SendTimeHistory::GetOutstandingBytes(uint16_t local_net_id, - uint16_t remote_net_id) const { +DataSize SendTimeHistory::GetOutstandingData(uint16_t local_net_id, + uint16_t remote_net_id) const { auto it = in_flight_bytes_.find({local_net_id, remote_net_id}); if (it != in_flight_bytes_.end()) { - return it->second; + return DataSize::bytes(it->second); } else { - return 0; + return DataSize::Zero(); } } diff --git a/modules/congestion_controller/rtp/send_time_history.h b/modules/congestion_controller/rtp/send_time_history.h index 98d3d5cc82..656b7e16a3 100644 --- a/modules/congestion_controller/rtp/send_time_history.h +++ b/modules/congestion_controller/rtp/send_time_history.h @@ -14,6 +14,7 @@ #include #include +#include "api/units/data_size.h" #include "modules/include/module_common_types.h" #include "rtc_base/constructormagic.h" @@ -43,8 +44,8 @@ class SendTimeHistory { // thus be non-null and have the sequence_number field set. bool GetFeedback(PacketFeedback* packet_feedback, bool remove); - size_t GetOutstandingBytes(uint16_t local_net_id, - uint16_t remote_net_id) const; + DataSize GetOutstandingData(uint16_t local_net_id, + uint16_t remote_net_id) const; private: using RemoteAndLocalNetworkId = std::pair; diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc index f5d2c38336..6ff1740cce 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc @@ -20,7 +20,42 @@ namespace webrtc { namespace webrtc_cc { +namespace { +void SortPacketFeedbackVector(std::vector* input) { + std::sort(input->begin(), input->end(), PacketFeedbackComparator()); +} +PacketResult NetworkPacketFeedbackFromRtpPacketFeedback( + const webrtc::PacketFeedback& pf) { + PacketResult feedback; + if (pf.arrival_time_ms == webrtc::PacketFeedback::kNotReceived) + feedback.receive_time = Timestamp::PlusInfinity(); + else + feedback.receive_time = Timestamp::ms(pf.arrival_time_ms); + if (pf.send_time_ms != webrtc::PacketFeedback::kNoSendTime) { + feedback.sent_packet = SentPacket(); + feedback.sent_packet->sequence_number = pf.long_sequence_number; + feedback.sent_packet->send_time = Timestamp::ms(pf.send_time_ms); + feedback.sent_packet->size = DataSize::bytes(pf.payload_size); + feedback.sent_packet->pacing_info = pf.pacing_info; + } + return feedback; +} + +std::vector PacketResultsFromRtpFeedbackVector( + const std::vector& feedback_vector) { + RTC_DCHECK(std::is_sorted(feedback_vector.begin(), feedback_vector.end(), + PacketFeedbackComparator())); + + std::vector packet_feedbacks; + packet_feedbacks.reserve(feedback_vector.size()); + for (const PacketFeedback& rtp_feedback : feedback_vector) { + auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback); + packet_feedbacks.push_back(feedback); + } + return packet_feedbacks; +} +} // namespace const int64_t kNoTimestamp = -1; const int64_t kSendTimeHistoryWindowMs = 60000; const int64_t kBaseTimestampScaleFactor = @@ -77,22 +112,49 @@ void TransportFeedbackAdapter::AddPacket(uint32_t ssrc, } } -void TransportFeedbackAdapter::AddUntracked(size_t packet_size, - int64_t send_time_ms) { +absl::optional TransportFeedbackAdapter::ProcessSentPacket( + const rtc::SentPacket& sent_packet) { rtc::CritScope cs(&lock_); - send_time_history_.AddUntracked(packet_size, send_time_ms); + // TODO(srte): Only use one way to indicate that packet feedback is used. + if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { + send_time_history_.OnSentPacket(sent_packet.packet_id, + sent_packet.send_time_ms); + absl::optional packet = + send_time_history_.GetPacket(sent_packet.packet_id); + if (packet) { + SentPacket msg; + msg.size = DataSize::bytes(packet->payload_size); + msg.send_time = Timestamp::ms(packet->send_time_ms); + msg.sequence_number = packet->long_sequence_number; + msg.prior_unacked_data = DataSize::bytes(packet->unacknowledged_data); + msg.data_in_flight = + send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_); + return msg; + } + } else if (sent_packet.info.included_in_allocation) { + send_time_history_.AddUntracked(sent_packet.info.packet_size_bytes, + sent_packet.send_time_ms); + } + return absl::nullopt; } -void TransportFeedbackAdapter::OnSentPacket(uint16_t sequence_number, - int64_t send_time_ms) { - rtc::CritScope cs(&lock_); - send_time_history_.OnSentPacket(sequence_number, send_time_ms); -} +absl::optional +TransportFeedbackAdapter::ProcessTransportFeedback( + const rtcp::TransportFeedback& feedback) { + int64_t feedback_time_ms = clock_->TimeInMilliseconds(); + DataSize prior_in_flight = GetOutstandingData(); + OnTransportFeedback(feedback); + std::vector feedback_vector = last_packet_feedback_vector_; + if (feedback_vector.empty()) + return absl::nullopt; -absl::optional TransportFeedbackAdapter::GetPacket( - uint16_t sequence_number) const { - rtc::CritScope cs(&lock_); - return send_time_history_.GetPacket(sequence_number); + SortPacketFeedbackVector(&feedback_vector); + TransportPacketsFeedback msg; + msg.packet_feedbacks = PacketResultsFromRtpFeedbackVector(feedback_vector); + msg.feedback_time = Timestamp::ms(feedback_time_ms); + msg.prior_in_flight = prior_in_flight; + msg.data_in_flight = GetOutstandingData(); + return msg; } void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, @@ -102,6 +164,11 @@ void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, remote_net_id_ = remote_id; } +DataSize TransportFeedbackAdapter::GetOutstandingData() const { + rtc::CritScope cs(&lock_); + return send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_); +} + std::vector TransportFeedbackAdapter::GetPacketFeedbackVector( const rtcp::TransportFeedback& feedback) { int64_t timestamp_us = feedback.GetBaseTimeUs(); @@ -190,10 +257,5 @@ std::vector TransportFeedbackAdapter::GetTransportFeedbackVector() const { return last_packet_feedback_vector_; } - -size_t TransportFeedbackAdapter::GetOutstandingBytes() const { - rtc::CritScope cs(&lock_); - return send_time_history_.GetOutstandingBytes(local_net_id_, remote_net_id_); -} } // namespace webrtc_cc } // namespace webrtc diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h index ee957752ce..8bc5547afc 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter.h +++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h @@ -17,6 +17,7 @@ #include "api/transport/network_types.h" #include "modules/congestion_controller/rtp/send_time_history.h" #include "rtc_base/criticalsection.h" +#include "rtc_base/network/sent_packet.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/thread_checker.h" #include "system_wrappers/include/clock.h" @@ -42,23 +43,22 @@ class TransportFeedbackAdapter { uint16_t sequence_number, size_t length, const PacedPacketInfo& pacing_info); - void AddUntracked(size_t packet_size, int64_t send_time_ms); - void OnSentPacket(uint16_t sequence_number, int64_t send_time_ms); - // TODO(holmer): This method should return DelayBasedBwe::Result so that we - // can get rid of the dependency on BitrateController. Requires changes - // to the CongestionController interface. - void OnTransportFeedback(const rtcp::TransportFeedback& feedback); + absl::optional ProcessSentPacket( + const rtc::SentPacket& sent_packet); + + absl::optional ProcessTransportFeedback( + const rtcp::TransportFeedback& feedback); + std::vector GetTransportFeedbackVector() const; - absl::optional GetPacket(uint16_t sequence_number) const; - - void SetTransportOverhead(int transport_overhead_bytes_per_packet); void SetNetworkIds(uint16_t local_id, uint16_t remote_id); - size_t GetOutstandingBytes() const; + DataSize GetOutstandingData() const; private: + void OnTransportFeedback(const rtcp::TransportFeedback& feedback); + std::vector GetPacketFeedbackVector( const rtcp::TransportFeedback& feedback); diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc index 70087678a2..d1404df63f 100644 --- a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc +++ b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc @@ -69,8 +69,9 @@ class TransportFeedbackAdapterTest : public ::testing::Test { adapter_->AddPacket(kSsrc, packet_feedback.sequence_number, packet_feedback.payload_size, packet_feedback.pacing_info); - adapter_->OnSentPacket(packet_feedback.sequence_number, - packet_feedback.send_time_ms); + adapter_->ProcessSentPacket(rtc::SentPacket(packet_feedback.sequence_number, + packet_feedback.send_time_ms, + rtc::PacketInfo())); } static constexpr uint32_t kSsrc = 8492; @@ -100,7 +101,7 @@ TEST_F(TransportFeedbackAdapterTest, ObserverSanity) { } EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1); - adapter_->OnTransportFeedback(feedback); + adapter_->ProcessTransportFeedback(feedback); adapter_->DeRegisterPacketFeedbackObserver(&mock); @@ -115,7 +116,7 @@ TEST_F(TransportFeedbackAdapterTest, ObserverSanity) { EXPECT_TRUE(feedback.AddReceivedPacket(new_packet.sequence_number, new_packet.arrival_time_ms * 1000)); EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0); - adapter_->OnTransportFeedback(second_feedback); + adapter_->ProcessTransportFeedback(second_feedback); } #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID) @@ -156,7 +157,7 @@ TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) { feedback.Build(); - adapter_->OnTransportFeedback(feedback); + adapter_->ProcessTransportFeedback(feedback); ComparePacketFeedbackVectors(packets, adapter_->GetTransportFeedbackVector()); } @@ -189,7 +190,7 @@ TEST_F(TransportFeedbackAdapterTest, FeedbackVectorReportsUnreceived) { feedback.Build(); - adapter_->OnTransportFeedback(feedback); + adapter_->ProcessTransportFeedback(feedback); ComparePacketFeedbackVectors(sent_packets, adapter_->GetTransportFeedbackVector()); } @@ -233,7 +234,7 @@ TEST_F(TransportFeedbackAdapterTest, HandlesDroppedPackets) { expected_packets[i].pacing_info = PacedPacketInfo(); } - adapter_->OnTransportFeedback(feedback); + adapter_->ProcessTransportFeedback(feedback); ComparePacketFeedbackVectors(expected_packets, adapter_->GetTransportFeedbackVector()); } @@ -269,7 +270,7 @@ TEST_F(TransportFeedbackAdapterTest, SendTimeWrapsBothWays) { std::vector expected_packets; expected_packets.push_back(packets[i]); - adapter_->OnTransportFeedback(*feedback.get()); + adapter_->ProcessTransportFeedback(*feedback.get()); ComparePacketFeedbackVectors(expected_packets, adapter_->GetTransportFeedbackVector()); } @@ -298,7 +299,7 @@ TEST_F(TransportFeedbackAdapterTest, HandlesArrivalReordering) { // Adapter keeps the packets ordered by sequence number (which is itself // assigned by the order of transmission). Reordering by some other criteria, // eg. arrival time, is up to the observers. - adapter_->OnTransportFeedback(feedback); + adapter_->ProcessTransportFeedback(feedback); ComparePacketFeedbackVectors(packets, adapter_->GetTransportFeedbackVector()); } @@ -362,7 +363,7 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) { std::vector received_feedback; EXPECT_TRUE(feedback.get() != nullptr); - adapter_->OnTransportFeedback(*feedback.get()); + adapter_->ProcessTransportFeedback(*feedback.get()); ComparePacketFeedbackVectors(sent_packets, adapter_->GetTransportFeedbackVector()); @@ -377,7 +378,7 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) { rtcp::TransportFeedback::ParseFrom(raw_packet.data(), raw_packet.size()); EXPECT_TRUE(feedback.get() != nullptr); - adapter_->OnTransportFeedback(*feedback.get()); + adapter_->ProcessTransportFeedback(*feedback.get()); { std::vector expected_packets; expected_packets.push_back(packet_feedback); diff --git a/modules/congestion_controller/transport_feedback_adapter.cc b/modules/congestion_controller/transport_feedback_adapter.cc index 3baa3ad5a5..7f07c73d60 100644 --- a/modules/congestion_controller/transport_feedback_adapter.cc +++ b/modules/congestion_controller/transport_feedback_adapter.cc @@ -201,6 +201,7 @@ absl::optional TransportFeedbackAdapter::GetMinFeedbackLoopRtt() size_t TransportFeedbackAdapter::GetOutstandingBytes() const { rtc::CritScope cs(&lock_); - return send_time_history_.GetOutstandingBytes(local_net_id_, remote_net_id_); + return send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_) + .bytes(); } } // namespace webrtc