diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc index c7998b2cd8..48072d0a0b 100644 --- a/webrtc/call/call.cc +++ b/webrtc/call/call.cc @@ -702,8 +702,9 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( RTC_DCHECK(configuration_thread_checker_.CalledOnValidThread()); RecoveredPacketReceiver* recovered_packet_receiver = this; - FlexfecReceiveStreamImpl* receive_stream = - new FlexfecReceiveStreamImpl(config, recovered_packet_receiver); + FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl( + config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(), + module_process_thread_.get()); { WriteLockScoped write_lock(*receive_crit_); @@ -1165,10 +1166,9 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, ParseRtpPacket(packet, length, packet_time); if (parsed_packet) { NotifyBweOfReceivedPacket(*parsed_packet); - auto status = - it->second->AddAndProcessReceivedPacket(std::move(*parsed_packet)) - ? DELIVERY_OK - : DELIVERY_PACKET_ERROR; + auto status = it->second->AddAndProcessReceivedPacket(*parsed_packet) + ? DELIVERY_OK + : DELIVERY_PACKET_ERROR; if (status == DELIVERY_OK) event_log_->LogRtpHeader(kIncomingPacket, media_type, packet, length); return status; diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc index 95bcfc14dc..f5272c9029 100644 --- a/webrtc/call/flexfec_receive_stream_impl.cc +++ b/webrtc/call/flexfec_receive_stream_impl.cc @@ -10,10 +10,16 @@ #include "webrtc/call/flexfec_receive_stream_impl.h" -#include +#include #include "webrtc/base/checks.h" #include "webrtc/base/logging.h" +#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" +#include "webrtc/modules/rtp_rtcp/include/receive_statistics.h" +#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h" +#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h" +#include "webrtc/modules/utility/include/process_thread.h" +#include "webrtc/system_wrappers/include/clock.h" namespace webrtc { @@ -97,33 +103,77 @@ std::unique_ptr MaybeCreateFlexfecReceiver( recovered_packet_receiver)); } +std::unique_ptr CreateRtpRtcpModule( + ReceiveStatistics* receive_statistics, + Transport* rtcp_send_transport, + RtcpRttStats* rtt_stats) { + RtpRtcp::Configuration configuration; + configuration.audio = false; + configuration.receiver_only = true; + configuration.clock = Clock::GetRealTimeClock(); + configuration.receive_statistics = receive_statistics; + configuration.outgoing_transport = rtcp_send_transport; + configuration.rtt_stats = rtt_stats; + std::unique_ptr rtp_rtcp(RtpRtcp::CreateRtpRtcp(configuration)); + return rtp_rtcp; +} + } // namespace FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl( const Config& config, - RecoveredPacketReceiver* recovered_packet_receiver) - : started_(false), - config_(config), - receiver_( - MaybeCreateFlexfecReceiver(config_, recovered_packet_receiver)) { + RecoveredPacketReceiver* recovered_packet_receiver, + RtcpRttStats* rtt_stats, + ProcessThread* process_thread) + : config_(config), + started_(false), + receiver_(MaybeCreateFlexfecReceiver(config_, recovered_packet_receiver)), + rtp_receive_statistics_( + ReceiveStatistics::Create(Clock::GetRealTimeClock())), + rtp_rtcp_(CreateRtpRtcpModule(rtp_receive_statistics_.get(), + config_.rtcp_send_transport, + rtt_stats)), + process_thread_(process_thread) { LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString(); + + // RTCP reporting. + rtp_rtcp_->SetSendingMediaStatus(false); + rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode); + rtp_rtcp_->SetSSRC(config_.local_ssrc); + process_thread_->RegisterModule(rtp_rtcp_.get()); } FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() { LOG(LS_INFO) << "~FlexfecReceiveStreamImpl: " << config_.ToString(); Stop(); + process_thread_->DeRegisterModule(rtp_rtcp_.get()); } bool FlexfecReceiveStreamImpl::AddAndProcessReceivedPacket( - RtpPacketReceived packet) { + const RtpPacketReceived& packet) { { rtc::CritScope cs(&crit_); if (!started_) return false; } + if (!receiver_) return false; - return receiver_->AddAndProcessReceivedPacket(std::move(packet)); + + if (!receiver_->AddAndProcessReceivedPacket(packet)) + return false; + + // Do not report media packets in the RTCP RRs generated by |rtp_rtcp_|. + if (packet.Ssrc() == config_.remote_ssrc) { + RTPHeader header; + packet.GetHeader(&header); + // FlexFEC packets are never retransmitted. + const bool kNotRetransmitted = false; + rtp_receive_statistics_->IncomingPacket(header, packet.size(), + kNotRetransmitted); + } + + return true; } void FlexfecReceiveStreamImpl::Start() { diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h index 7267dc0e45..36ea623c2e 100644 --- a/webrtc/call/flexfec_receive_stream_impl.h +++ b/webrtc/call/flexfec_receive_stream_impl.h @@ -12,25 +12,31 @@ #define WEBRTC_CALL_FLEXFEC_RECEIVE_STREAM_IMPL_H_ #include -#include -#include "webrtc/base/basictypes.h" #include "webrtc/base/criticalsection.h" #include "webrtc/call/flexfec_receive_stream.h" -#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" -#include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h" namespace webrtc { +class FlexfecReceiver; +class ProcessThread; +class ReceiveStatistics; +class RecoveredPacketReceiver; +class RtcpRttStats; +class RtpPacketReceived; +class RtpRtcp; + class FlexfecReceiveStreamImpl : public FlexfecReceiveStream { public: FlexfecReceiveStreamImpl(const Config& config, - RecoveredPacketReceiver* recovered_packet_receiver); + RecoveredPacketReceiver* recovered_packet_receiver, + RtcpRttStats* rtt_stats, + ProcessThread* process_thread); ~FlexfecReceiveStreamImpl() override; const Config& GetConfig() const { return config_; } - bool AddAndProcessReceivedPacket(RtpPacketReceived packet); + bool AddAndProcessReceivedPacket(const RtpPacketReceived& packet); // Implements FlexfecReceiveStream. void Start() override; @@ -38,11 +44,18 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream { Stats GetStats() const override; private: - rtc::CriticalSection crit_; - bool started_ GUARDED_BY(crit_); - + // Config. const Config config_; + bool started_ GUARDED_BY(crit_); + rtc::CriticalSection crit_; + + // Erasure code interfacing. const std::unique_ptr receiver_; + + // RTCP reporting. + const std::unique_ptr rtp_receive_statistics_; + const std::unique_ptr rtp_rtcp_; + ProcessThread* process_thread_; }; } // namespace webrtc diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc index 24026365d3..2365811edb 100644 --- a/webrtc/call/flexfec_receive_stream_unittest.cc +++ b/webrtc/call/flexfec_receive_stream_unittest.cc @@ -12,10 +12,13 @@ #include "webrtc/base/array_view.h" #include "webrtc/call/flexfec_receive_stream_impl.h" +#include "webrtc/modules/pacing/packet_router.h" #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h" #include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h" +#include "webrtc/modules/rtp_rtcp/mocks/mock_rtcp_rtt_stats.h" #include "webrtc/modules/rtp_rtcp/source/byte_io.h" #include "webrtc/modules/rtp_rtcp/source/rtp_header_extensions.h" +#include "webrtc/modules/utility/include/mock/mock_process_thread.h" #include "webrtc/test/gmock.h" #include "webrtc/test/gtest.h" #include "webrtc/test/mock_transport.h" @@ -74,11 +77,16 @@ class FlexfecReceiveStreamTest : public ::testing::Test { protected: FlexfecReceiveStreamTest() : config_(CreateDefaultConfig(&rtcp_send_transport_)), - receive_stream_(config_, &recovered_packet_receiver_) {} + receive_stream_(config_, + &recovered_packet_receiver_, + &rtt_stats_, + &process_thread_) {} + MockTransport rtcp_send_transport_; FlexfecReceiveStream::Config config_; MockRecoveredPacketReceiver recovered_packet_receiver_; - MockTransport rtcp_send_transport_; + MockRtcpRttStats rtt_stats_; + MockProcessThread process_thread_; FlexfecReceiveStreamImpl receive_stream_; }; @@ -126,7 +134,8 @@ TEST_F(FlexfecReceiveStreamTest, RecoversPacketWhenStarted) { // clang-format on testing::StrictMock recovered_packet_receiver; - FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver); + FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver, + &rtt_stats_, &process_thread_); // Do not call back before being started. receive_stream.AddAndProcessReceivedPacket(ParsePacket(kFlexfecPacket)); diff --git a/webrtc/media/engine/webrtcvideoengine2.cc b/webrtc/media/engine/webrtcvideoengine2.cc index 996e2bdb00..b0d3b89b8a 100644 --- a/webrtc/media/engine/webrtcvideoengine2.cc +++ b/webrtc/media/engine/webrtcvideoengine2.cc @@ -2219,6 +2219,7 @@ void WebRtcVideoChannel2::WebRtcVideoReceiveStream::SetLocalSsrc( } config_.rtp.local_ssrc = local_ssrc; + flexfec_config_.local_ssrc = local_ssrc; LOG(LS_INFO) << "RecreateWebRtcStream (recv) because of SetLocalSsrc; local_ssrc=" << local_ssrc; @@ -2246,6 +2247,7 @@ void WebRtcVideoChannel2::WebRtcVideoReceiveStream::SetFeedbackParameters( config_.rtp.nack.rtp_history_ms = nack_history_ms; config_.rtp.transport_cc = transport_cc_enabled; config_.rtp.rtcp_mode = rtcp_mode; + flexfec_config_.rtcp_mode = rtcp_mode; LOG(LS_INFO) << "RecreateWebRtcStream (recv) because of SetFeedbackParameters; nack=" << nack_enabled << ", remb=" << remb_enabled diff --git a/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h b/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h index 9ad093133c..a986fbb822 100644 --- a/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h +++ b/webrtc/modules/rtp_rtcp/include/flexfec_receiver.h @@ -43,13 +43,13 @@ class FlexfecReceiver { // Inserts a received packet (can be either media or FlexFEC) into the // internal buffer, and sends the received packets to the erasure code. // All newly recovered packets are sent back through the callback. - bool AddAndProcessReceivedPacket(RtpPacketReceived packet); + bool AddAndProcessReceivedPacket(const RtpPacketReceived& packet); // Returns a counter describing the added and recovered packets. FecPacketCounter GetPacketCounter() const; private: - bool AddReceivedPacket(RtpPacketReceived packet); + bool AddReceivedPacket(const RtpPacketReceived& packet); bool ProcessReceivedPackets(); // Config. diff --git a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc index 80f5b1f3b6..b81a039954 100644 --- a/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc +++ b/webrtc/modules/rtp_rtcp/source/flexfec_receiver.cc @@ -45,7 +45,8 @@ FlexfecReceiver::FlexfecReceiver( FlexfecReceiver::~FlexfecReceiver() = default; -bool FlexfecReceiver::AddAndProcessReceivedPacket(RtpPacketReceived packet) { +bool FlexfecReceiver::AddAndProcessReceivedPacket( + const RtpPacketReceived& packet) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); if (!AddReceivedPacket(std::move(packet))) { return false; @@ -58,7 +59,7 @@ FecPacketCounter FlexfecReceiver::GetPacketCounter() const { return packet_counter_; } -bool FlexfecReceiver::AddReceivedPacket(RtpPacketReceived packet) { +bool FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) { RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_); // RTP packets with a full base header (12 bytes), but without payload, diff --git a/webrtc/test/call_test.cc b/webrtc/test/call_test.cc index 290992a9ca..9521d3430d 100644 --- a/webrtc/test/call_test.cc +++ b/webrtc/test/call_test.cc @@ -279,6 +279,7 @@ void CallTest::CreateMatchingReceiveConfigs(Transport* rtcp_send_transport) { config.payload_type = kFlexfecPayloadType; config.remote_ssrc = kFlexfecSendSsrc; config.protected_media_ssrcs = {kVideoSendSsrcs[0]}; + config.local_ssrc = kReceiverLocalVideoSsrc; for (const RtpExtension& extension : video_send_config_.rtp.extensions) config.rtp_header_extensions.push_back(extension); flexfec_receive_configs_.push_back(config); diff --git a/webrtc/test/call_test.h b/webrtc/test/call_test.h index 03bc5bb3b5..b4101a4ca6 100644 --- a/webrtc/test/call_test.h +++ b/webrtc/test/call_test.h @@ -56,7 +56,6 @@ class CallTest : public ::testing::Test { static const uint32_t kFlexfecSendSsrc; static const uint32_t kReceiverLocalVideoSsrc; static const uint32_t kReceiverLocalAudioSsrc; - static const uint32_t kReceiverLocalFlexfecSsrc; static const int kNackRtpHistoryMs; protected: diff --git a/webrtc/video/end_to_end_tests.cc b/webrtc/video/end_to_end_tests.cc index 946d81d5df..502851bbce 100644 --- a/webrtc/video/end_to_end_tests.cc +++ b/webrtc/video/end_to_end_tests.cc @@ -705,84 +705,126 @@ TEST_P(EndToEndTest, CanReceiveUlpfec) { RunBaseTest(&test); } -TEST_P(EndToEndTest, CanReceiveFlexfec) { - class FlexfecRenderObserver : public test::EndToEndTest, - public rtc::VideoSinkInterface { - public: - FlexfecRenderObserver() - : EndToEndTest(kDefaultTimeoutMs), random_(0xcafef00d1) {} +class FlexfecRenderObserver : public test::EndToEndTest, + public rtc::VideoSinkInterface { + public: + static constexpr uint32_t kVideoLocalSsrc = 123; + static constexpr uint32_t kFlexfecLocalSsrc = 456; - size_t GetNumFlexfecStreams() const override { return 1; } + explicit FlexfecRenderObserver(bool expect_flexfec_rtcp) + : test::EndToEndTest(test::CallTest::kDefaultTimeoutMs), + expect_flexfec_rtcp_(expect_flexfec_rtcp), + received_flexfec_rtcp_(false), + random_(0xcafef00d1) {} - private: - Action OnSendRtp(const uint8_t* packet, size_t length) override { - rtc::CritScope lock(&crit_); - RTPHeader header; - EXPECT_TRUE(parser_->Parse(packet, length, &header)); + size_t GetNumFlexfecStreams() const override { return 1; } - uint8_t payload_type = header.payloadType; - if (payload_type != kFakeVideoSendPayloadType) { - EXPECT_EQ(kFlexfecPayloadType, payload_type); - } + private: + Action OnSendRtp(const uint8_t* packet, size_t length) override { + rtc::CritScope lock(&crit_); + RTPHeader header; + EXPECT_TRUE(parser_->Parse(packet, length, &header)); - // Is this a retransmitted media packet? From the perspective of FEC, this - // packet is then no longer dropped, so remove it from the list of - // dropped packets. - if (payload_type == kFakeVideoSendPayloadType) { - auto seq_num_it = dropped_sequence_numbers_.find(header.sequenceNumber); - if (seq_num_it != dropped_sequence_numbers_.end()) { - dropped_sequence_numbers_.erase(seq_num_it); - auto ts_it = dropped_timestamps_.find(header.timestamp); - EXPECT_NE(ts_it, dropped_timestamps_.end()); - dropped_timestamps_.erase(ts_it); - - return SEND_PACKET; - } - } - - // Simulate 5% packet loss. Record what media packets, and corresponding - // timestamps, that were dropped. - if (random_.Rand(1, 100) <= 5) { - if (payload_type == kFakeVideoSendPayloadType) { - dropped_sequence_numbers_.insert(header.sequenceNumber); - dropped_timestamps_.insert(header.timestamp); - } - - return DROP_PACKET; - } - - return SEND_PACKET; + uint8_t payload_type = header.payloadType; + if (payload_type != test::CallTest::kFakeVideoSendPayloadType) { + EXPECT_EQ(test::CallTest::kFlexfecPayloadType, payload_type); } - void OnFrame(const VideoFrame& video_frame) override { - rtc::CritScope lock(&crit_); - // Rendering frame with timestamp of packet that was dropped -> FEC - // protection worked. - auto it = dropped_timestamps_.find(video_frame.timestamp()); - if (it != dropped_timestamps_.end()) + // Is this a retransmitted media packet? From the perspective of FEC, this + // packet is then no longer dropped, so remove it from the list of + // dropped packets. + if (payload_type == test::CallTest::kFakeVideoSendPayloadType) { + auto seq_num_it = dropped_sequence_numbers_.find(header.sequenceNumber); + if (seq_num_it != dropped_sequence_numbers_.end()) { + dropped_sequence_numbers_.erase(seq_num_it); + auto ts_it = dropped_timestamps_.find(header.timestamp); + EXPECT_NE(ts_it, dropped_timestamps_.end()); + dropped_timestamps_.erase(ts_it); + + return SEND_PACKET; + } + } + + // Simulate 5% packet loss. Record what media packets, and corresponding + // timestamps, that were dropped. + if (random_.Rand(1, 100) <= 5) { + if (payload_type == test::CallTest::kFakeVideoSendPayloadType) { + dropped_sequence_numbers_.insert(header.sequenceNumber); + dropped_timestamps_.insert(header.timestamp); + } + + return DROP_PACKET; + } + + return SEND_PACKET; + } + + Action OnReceiveRtcp(const uint8_t* data, size_t length) override { + test::RtcpPacketParser parser; + + parser.Parse(data, length); + if (parser.sender_ssrc() == kFlexfecLocalSsrc) { + EXPECT_EQ(1, parser.receiver_report()->num_packets()); + const std::vector& report_blocks = + parser.receiver_report()->report_blocks(); + if (!report_blocks.empty()) { + EXPECT_EQ(1U, report_blocks.size()); + EXPECT_EQ(test::CallTest::kFlexfecSendSsrc, + report_blocks[0].source_ssrc()); + received_flexfec_rtcp_ = true; + } + } + + return SEND_PACKET; + } + + void OnFrame(const VideoFrame& video_frame) override { + rtc::CritScope lock(&crit_); + // Rendering frame with timestamp of packet that was dropped -> FEC + // protection worked. + auto it = dropped_timestamps_.find(video_frame.timestamp()); + if (it != dropped_timestamps_.end()) { + if (!expect_flexfec_rtcp_ || received_flexfec_rtcp_) { observation_complete_.Set(); + } } + } - void ModifyVideoConfigs( - VideoSendStream::Config* send_config, - std::vector* receive_configs, - VideoEncoderConfig* encoder_config) override { - (*receive_configs)[0].renderer = this; - } + void ModifyVideoConfigs( + VideoSendStream::Config* send_config, + std::vector* receive_configs, + VideoEncoderConfig* encoder_config) override { + (*receive_configs)[0].rtp.local_ssrc = kVideoLocalSsrc; + (*receive_configs)[0].renderer = this; + } - void PerformTest() override { - EXPECT_TRUE(Wait()) - << "Timed out waiting for dropped frames to be rendered."; - } + void ModifyFlexfecConfigs( + std::vector* receive_configs) override { + (*receive_configs)[0].local_ssrc = kFlexfecLocalSsrc; + } - rtc::CriticalSection crit_; - std::set dropped_sequence_numbers_ GUARDED_BY(crit_); - // Since several packets can have the same timestamp a multiset is used - // instead of a set. - std::multiset dropped_timestamps_ GUARDED_BY(crit_); - Random random_; - } test; + void PerformTest() override { + EXPECT_TRUE(Wait()) + << "Timed out waiting for dropped frames to be rendered."; + } + rtc::CriticalSection crit_; + std::set dropped_sequence_numbers_ GUARDED_BY(crit_); + // Since several packets can have the same timestamp a multiset is used + // instead of a set. + std::multiset dropped_timestamps_ GUARDED_BY(crit_); + bool expect_flexfec_rtcp_; + bool received_flexfec_rtcp_; + Random random_; +}; + +TEST_P(EndToEndTest, ReceivesFlexfec) { + FlexfecRenderObserver test(false); + RunBaseTest(&test); +} + +TEST_P(EndToEndTest, ReceivesFlexfecAndSendsCorrespondingRtcp) { + FlexfecRenderObserver test(true); RunBaseTest(&test); } diff --git a/webrtc/video/video_quality_test.cc b/webrtc/video/video_quality_test.cc index e3ae616df2..c438274fca 100644 --- a/webrtc/video/video_quality_test.cc +++ b/webrtc/video/video_quality_test.cc @@ -1121,6 +1121,7 @@ void VideoQualityTest::SetupVideo(Transport* send_transport, flexfec_receive_config.remote_ssrc = video_send_config_.rtp.flexfec.ssrc; flexfec_receive_config.protected_media_ssrcs = video_send_config_.rtp.flexfec.protected_media_ssrcs; + flexfec_receive_config.local_ssrc = kReceiverLocalVideoSsrc; flexfec_receive_config.transport_cc = params_.call.send_side_bwe; if (params_.call.send_side_bwe) { flexfec_receive_config.rtp_header_extensions.push_back(