diff --git a/call/call.cc b/call/call.cc index 28091a8b34..67d1c53ed9 100644 --- a/call/call.cc +++ b/call/call.cc @@ -58,6 +58,7 @@ #include "rtc_base/trace_event.h" #include "system_wrappers/include/clock.h" #include "system_wrappers/include/cpu_info.h" +#include "system_wrappers/include/field_trial.h" #include "system_wrappers/include/metrics.h" #include "video/call_stats.h" #include "video/send_delay_stats.h" @@ -398,6 +399,8 @@ class Call final : public webrtc::Call, MediaTransportInterface* media_transport_ RTC_GUARDED_BY(&target_observer_crit_) = nullptr; + const bool field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_; + RTC_DISALLOW_COPY_AND_ASSIGN(Call); }; } // namespace internal @@ -476,7 +479,10 @@ Call::Call(Clock* clock, receive_side_cc_(clock_, transport_send->packet_router()), receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()), video_send_delay_stats_(new SendDelayStats(clock_)), - start_ms_(clock_->TimeInMilliseconds()) { + start_ms_(clock_->TimeInMilliseconds()), + field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_( + webrtc::field_trial::IsEnabled( + "WebRTC-Video-BufferPacketsWithUnknownSsrc")) { RTC_DCHECK(config.event_log != nullptr); transport_send_ = std::move(transport_send); transport_send_ptr_ = transport_send_.get(); @@ -1355,6 +1361,28 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type, // which is being torned down. return DELIVERY_UNKNOWN_SSRC; } + + if (field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_) { + // Check if packet arrives for a stream that requires decryption + // but does not yet have a FrameDecryptor. + // In that case buffer the packet and replay it when the frame decryptor + // is set. + // TODO(bugs.webrtc.org/10416) : Remove this check once FrameDecryptor + // is created as part of creating receive stream. + const uint32_t ssrc = parsed_packet.Ssrc(); + auto vrs = std::find_if( + video_receive_streams_.begin(), video_receive_streams_.end(), + [&](const VideoReceiveStream* stream) { + return (stream->config().rtp.remote_ssrc == ssrc || + stream->config().rtp.rtx_ssrc == ssrc); + }); + if (vrs != video_receive_streams_.end() && + (*vrs)->config().crypto_options.sframe.require_frame_encryption && + (*vrs)->config().frame_decryptor == nullptr) { + return DELIVERY_UNKNOWN_SSRC; + } + } + parsed_packet.IdentifyExtensions(it->second.extensions); NotifyBweOfReceivedPacket(parsed_packet, media_type); diff --git a/media/BUILD.gn b/media/BUILD.gn index 8316603b9c..3484b408db 100644 --- a/media/BUILD.gn +++ b/media/BUILD.gn @@ -286,6 +286,8 @@ rtc_static_library("rtc_audio_video") { "engine/payload_type_mapper.h", "engine/simulcast.cc", "engine/simulcast.h", + "engine/unhandled_packets_buffer.cc", + "engine/unhandled_packets_buffer.h", "engine/webrtc_media_engine.cc", "engine/webrtc_media_engine.h", "engine/webrtc_video_engine.cc", @@ -528,6 +530,7 @@ if (rtc_include_tests) { "engine/payload_type_mapper_unittest.cc", "engine/simulcast_encoder_adapter_unittest.cc", "engine/simulcast_unittest.cc", + "engine/unhandled_packets_buffer_unittest.cc", "engine/webrtc_media_engine_unittest.cc", "engine/webrtc_video_engine_unittest.cc", ] diff --git a/media/engine/unhandled_packets_buffer.cc b/media/engine/unhandled_packets_buffer.cc new file mode 100644 index 0000000000..58ce1dcd2e --- /dev/null +++ b/media/engine/unhandled_packets_buffer.cc @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "media/engine/unhandled_packets_buffer.h" +#include "rtc_base/logging.h" +#include "rtc_base/strings/string_builder.h" + +namespace cricket { + +UnhandledPacketsBuffer::UnhandledPacketsBuffer() { + buffer_.reserve(kMaxStashedPackets); +} + +UnhandledPacketsBuffer::~UnhandledPacketsBuffer() = default; + +// Store packet in buffer. +void UnhandledPacketsBuffer::AddPacket(uint32_t ssrc, + int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + if (buffer_.size() < kMaxStashedPackets) { + buffer_.push_back({ssrc, packet_time_us, packet}); + } else { + RTC_DCHECK_LT(insert_pos_, kMaxStashedPackets); + buffer_[insert_pos_] = {ssrc, packet_time_us, packet}; + } + insert_pos_ = (insert_pos_ + 1) % kMaxStashedPackets; +} + +// Backfill |consumer| with all stored packet related |ssrcs|. +void UnhandledPacketsBuffer::BackfillPackets( + rtc::ArrayView ssrcs, + std::function consumer) { + size_t start; + if (buffer_.size() < kMaxStashedPackets) { + start = 0; + } else { + start = insert_pos_; + } + + size_t count = 0; + std::vector remaining; + remaining.reserve(kMaxStashedPackets); + for (size_t i = 0; i < buffer_.size(); ++i) { + const size_t pos = (i + start) % kMaxStashedPackets; + + // One or maybe 2 ssrcs is expected => loop array instead of more elaborate + // scheme. + const uint32_t ssrc = buffer_[pos].ssrc; + if (std::find(ssrcs.begin(), ssrcs.end(), ssrc) != ssrcs.end()) { + ++count; + consumer(ssrc, buffer_[pos].packet_time_us, buffer_[pos].packet); + } else { + remaining.push_back(buffer_[pos]); + } + } + + insert_pos_ = 0; // insert_pos is only used when buffer is full. + buffer_.swap(remaining); +} + +} // namespace cricket diff --git a/media/engine/unhandled_packets_buffer.h b/media/engine/unhandled_packets_buffer.h new file mode 100644 index 0000000000..0db06d2a6e --- /dev/null +++ b/media/engine/unhandled_packets_buffer.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_ +#define MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_ + +#include +#include +#include +#include +#include + +#include "rtc_base/copy_on_write_buffer.h" + +namespace cricket { + +class UnhandledPacketsBuffer { + public: + // Visible for testing. + static constexpr size_t kMaxStashedPackets = 50; + + UnhandledPacketsBuffer(); + ~UnhandledPacketsBuffer(); + + // Store packet in buffer. + void AddPacket(uint32_t ssrc, + int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet); + + // Feed all packets with |ssrcs| into |consumer|. + void BackfillPackets( + rtc::ArrayView ssrcs, + std::function consumer); + + private: + size_t insert_pos_ = 0; + struct PacketWithMetadata { + uint32_t ssrc; + int64_t packet_time_us; + rtc::CopyOnWriteBuffer packet; + }; + std::vector buffer_; +}; + +} // namespace cricket + +#endif // MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_ diff --git a/media/engine/unhandled_packets_buffer_unittest.cc b/media/engine/unhandled_packets_buffer_unittest.cc new file mode 100644 index 0000000000..1a7dd5508f --- /dev/null +++ b/media/engine/unhandled_packets_buffer_unittest.cc @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "media/engine/unhandled_packets_buffer.h" + +#include +#include "absl/memory/memory.h" +#include "test/gmock.h" +#include "test/gtest.h" + +using ::testing::_; + +namespace { + +rtc::CopyOnWriteBuffer Create(int n) { + return rtc::CopyOnWriteBuffer(std::to_string(n)); +} + +constexpr int64_t kPacketTimeUs = 122; + +} // namespace + +namespace cricket { + +TEST(UnhandledPacketsBuffer, NoPackets) { + UnhandledPacketsBuffer buff; + buff.AddPacket(2, kPacketTimeUs, Create(3)); + + std::vector ssrcs = {3}; + std::vector packets; + buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + packets.push_back(packet); + }); + EXPECT_EQ(0u, packets.size()); +} + +TEST(UnhandledPacketsBuffer, OnePacket) { + UnhandledPacketsBuffer buff; + buff.AddPacket(2, kPacketTimeUs, Create(3)); + + std::vector ssrcs = {2}; + std::vector packets; + buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + packets.push_back(packet); + }); + ASSERT_EQ(1u, packets.size()); + EXPECT_EQ(Create(3), packets[0]); +} + +TEST(UnhandledPacketsBuffer, TwoPacketsTwoSsrcs) { + UnhandledPacketsBuffer buff; + buff.AddPacket(2, kPacketTimeUs, Create(3)); + buff.AddPacket(3, kPacketTimeUs, Create(4)); + + std::vector ssrcs = {2, 3}; + std::vector packets; + buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + packets.push_back(packet); + }); + ASSERT_EQ(2u, packets.size()); + EXPECT_EQ(Create(3), packets[0]); + EXPECT_EQ(Create(4), packets[1]); +} + +TEST(UnhandledPacketsBuffer, TwoPacketsTwoSsrcsOneMatch) { + UnhandledPacketsBuffer buff; + buff.AddPacket(2, kPacketTimeUs, Create(3)); + buff.AddPacket(3, kPacketTimeUs, Create(4)); + + std::vector ssrcs = {3}; + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 3u); + EXPECT_EQ(Create(4), packet); + }); + + std::vector ssrcs_again = {2}; + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 2u); + EXPECT_EQ(Create(3), packet); + }); +} + +TEST(UnhandledPacketsBuffer, Full) { + const size_t cnt = UnhandledPacketsBuffer::kMaxStashedPackets; + UnhandledPacketsBuffer buff; + for (size_t i = 0; i < cnt; i++) { + buff.AddPacket(2, kPacketTimeUs, Create(i)); + } + + std::vector ssrcs = {2}; + std::vector packets; + buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + packets.push_back(packet); + }); + ASSERT_EQ(cnt, packets.size()); + for (size_t i = 0; i < cnt; i++) { + EXPECT_EQ(Create(i), packets[i]); + } + + // Add a packet after backfill and check that it comes back. + buff.AddPacket(23, kPacketTimeUs, Create(1001)); + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 23u); + EXPECT_EQ(Create(1001), packet); + }); +} + +TEST(UnhandledPacketsBuffer, Wrap) { + UnhandledPacketsBuffer buff; + size_t cnt = UnhandledPacketsBuffer::kMaxStashedPackets + 10; + for (size_t i = 0; i < cnt; i++) { + buff.AddPacket(2, kPacketTimeUs, Create(i)); + } + + std::vector ssrcs = {2}; + std::vector packets; + buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + packets.push_back(packet); + }); + for (size_t i = 0; i < packets.size(); i++) { + EXPECT_EQ(Create(i + 10), packets[i]); + } + + // Add a packet after backfill and check that it comes back. + buff.AddPacket(23, kPacketTimeUs, Create(1001)); + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 23u); + EXPECT_EQ(Create(1001), packet); + }); +} + +TEST(UnhandledPacketsBuffer, Interleaved) { + UnhandledPacketsBuffer buff; + buff.AddPacket(2, kPacketTimeUs, Create(2)); + buff.AddPacket(3, kPacketTimeUs, Create(3)); + + std::vector ssrcs = {2}; + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 2u); + EXPECT_EQ(Create(2), packet); + }); + + buff.AddPacket(4, kPacketTimeUs, Create(4)); + + ssrcs = {3}; + buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + EXPECT_EQ(ssrc, 3u); + EXPECT_EQ(Create(3), packet); + }); +} + +} // namespace cricket diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc index 7ee30848f9..54eeb1fcce 100644 --- a/media/engine/webrtc_video_engine.cc +++ b/media/engine/webrtc_video_engine.cc @@ -528,7 +528,12 @@ WebRtcVideoChannel::WebRtcVideoChannel( last_stats_log_ms_(-1), discard_unknown_ssrc_packets_(webrtc::field_trial::IsEnabled( "WebRTC-Video-DiscardPacketsWithUnknownSsrc")), - crypto_options_(crypto_options) { + crypto_options_(crypto_options), + unknown_ssrc_packet_buffer_( + webrtc::field_trial::IsEnabled( + "WebRTC-Video-BufferPacketsWithUnknownSsrc") + ? new UnhandledPacketsBuffer() + : nullptr) { RTC_DCHECK(thread_checker_.CalledOnValidThread()); rtcp_receiver_report_ssrc_ = kDefaultRtcpReceiverReportSsrc; @@ -1183,7 +1188,7 @@ bool WebRtcVideoChannel::AddRecvStream(const StreamParams& sp, } receive_streams_[ssrc] = new WebRtcVideoReceiveStream( - call_, sp, std::move(config), decoder_factory_, default_stream, + this, call_, sp, std::move(config), decoder_factory_, default_stream, recv_codecs_, flexfec_config); return true; @@ -1375,12 +1380,17 @@ void WebRtcVideoChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, break; } - if (discard_unknown_ssrc_packets_) { + uint32_t ssrc = 0; + if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { return; } - uint32_t ssrc = 0; - if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) { + if (unknown_ssrc_packet_buffer_) { + unknown_ssrc_packet_buffer_->AddPacket(ssrc, packet_time_us, packet); + return; + } + + if (discard_unknown_ssrc_packets_) { return; } @@ -1420,6 +1430,52 @@ void WebRtcVideoChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet, } } +void WebRtcVideoChannel::BackfillBufferedPackets( + rtc::ArrayView ssrcs) { + RTC_DCHECK_RUN_ON(&thread_checker_); + if (!unknown_ssrc_packet_buffer_) { + return; + } + + int delivery_ok_cnt = 0; + int delivery_unknown_ssrc_cnt = 0; + int delivery_packet_error_cnt = 0; + webrtc::PacketReceiver* receiver = this->call_->Receiver(); + unknown_ssrc_packet_buffer_->BackfillPackets( + ssrcs, [&](uint32_t ssrc, int64_t packet_time_us, + rtc::CopyOnWriteBuffer packet) { + switch (receiver->DeliverPacket(webrtc::MediaType::VIDEO, packet, + packet_time_us)) { + case webrtc::PacketReceiver::DELIVERY_OK: + delivery_ok_cnt++; + break; + case webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC: + delivery_unknown_ssrc_cnt++; + break; + case webrtc::PacketReceiver::DELIVERY_PACKET_ERROR: + delivery_packet_error_cnt++; + break; + } + }); + rtc::StringBuilder out; + out << "[ "; + for (uint32_t ssrc : ssrcs) { + out << std::to_string(ssrc) << " "; + } + out << "]"; + auto level = rtc::LS_INFO; + if (delivery_unknown_ssrc_cnt > 0 || delivery_packet_error_cnt > 0) { + level = rtc::LS_ERROR; + } + int total = + delivery_ok_cnt + delivery_unknown_ssrc_cnt + delivery_packet_error_cnt; + RTC_LOG_V(level) << "Backfilled " << total + << " packets for ssrcs: " << out.Release() + << " ok: " << delivery_ok_cnt + << " error: " << delivery_packet_error_cnt + << " unknown: " << delivery_unknown_ssrc_cnt; +} + void WebRtcVideoChannel::OnRtcpReceived(rtc::CopyOnWriteBuffer packet, int64_t packet_time_us) { RTC_DCHECK_RUN_ON(&thread_checker_); @@ -1909,6 +1965,9 @@ void WebRtcVideoChannel::WebRtcVideoSendStream::SetFrameEncryptor( RTC_DCHECK_RUN_ON(&thread_checker_); parameters_.config.frame_encryptor = frame_encryptor; if (stream_) { + RTC_LOG(LS_INFO) + << "RecreateWebRtcStream (send) because of SetFrameEncryptor, ssrc=" + << parameters_.config.rtp.ssrcs[0]; RecreateWebRtcStream(); } } @@ -2237,6 +2296,7 @@ void WebRtcVideoChannel::WebRtcVideoSendStream::RecreateWebRtcStream() { } WebRtcVideoChannel::WebRtcVideoReceiveStream::WebRtcVideoReceiveStream( + WebRtcVideoChannel* channel, webrtc::Call* call, const StreamParams& sp, webrtc::VideoReceiveStream::Config config, @@ -2244,7 +2304,8 @@ WebRtcVideoChannel::WebRtcVideoReceiveStream::WebRtcVideoReceiveStream( bool default_stream, const std::vector& recv_codecs, const webrtc::FlexfecReceiveStream::Config& flexfec_config) - : call_(call), + : channel_(channel), + call_(call), stream_params_(sp), stream_(NULL), default_stream_(default_stream), @@ -2440,6 +2501,16 @@ void WebRtcVideoChannel::WebRtcVideoReceiveStream::RecreateWebRtcVideoStream() { } MaybeAssociateFlexfecWithVideo(); stream_->Start(); + + if (webrtc::field_trial::IsEnabled( + "WebRTC-Video-BufferPacketsWithUnknownSsrc")) { + // TODO(bugs.webrtc.org/10416) : Remove this check and backfill + // when the stream is created (i.e remote check for frame_decryptor) + // once FrameDecryptor is created as part of creating receive stream. + if (config_.frame_decryptor) { + channel_->BackfillBufferedPackets(stream_params_.ssrcs); + } + } } void WebRtcVideoChannel::WebRtcVideoReceiveStream:: @@ -2496,6 +2567,9 @@ void WebRtcVideoChannel::WebRtcVideoReceiveStream::SetFrameDecryptor( rtc::scoped_refptr frame_decryptor) { config_.frame_decryptor = frame_decryptor; if (stream_) { + RTC_LOG(LS_INFO) + << "RecreateWebRtcStream (recv) because of SetFrameDecryptor, " + << "remote_ssrc=" << config_.rtp.remote_ssrc; RecreateWebRtcVideoStream(); } } diff --git a/media/engine/webrtc_video_engine.h b/media/engine/webrtc_video_engine.h index f3d4104973..0b209873c8 100644 --- a/media/engine/webrtc_video_engine.h +++ b/media/engine/webrtc_video_engine.h @@ -29,6 +29,7 @@ #include "call/video_receive_stream.h" #include "call/video_send_stream.h" #include "media/base/media_engine.h" +#include "media/engine/unhandled_packets_buffer.h" #include "rtc_base/async_invoker.h" #include "rtc_base/critical_section.h" #include "rtc_base/network_route.h" @@ -199,6 +200,10 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport { std::vector GetSources(uint32_t ssrc) const override; + // Take the buffered packets for |ssrcs| and feed them into DeliverPacket. + // This method does nothing unless unknown_ssrc_packet_buffer_ is configured. + void BackfillBufferedPackets(rtc::ArrayView ssrcs); + private: class WebRtcVideoReceiveStream; struct VideoCodecSettings { @@ -375,6 +380,7 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport { : public rtc::VideoSinkInterface { public: WebRtcVideoReceiveStream( + WebRtcVideoChannel* channel, webrtc::Call* call, const StreamParams& sp, webrtc::VideoReceiveStream::Config config, @@ -425,6 +431,7 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport { std::string GetCodecNameFromPayloadType(int payload_type); + WebRtcVideoChannel* const channel_; webrtc::Call* const call_; const StreamParams stream_params_; @@ -539,6 +546,10 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport { // Per peer connection crypto options that last for the lifetime of the peer // connection. const webrtc::CryptoOptions crypto_options_ RTC_GUARDED_BY(thread_checker_); + + // Buffer for unhandled packets. + std::unique_ptr unknown_ssrc_packet_buffer_ + RTC_GUARDED_BY(thread_checker_); }; class EncoderStreamFactory