Improve handling of packets with unknown ssrc.

Add a feature (gated by field trial) that stores
packets with unknown ssrc in a circular buffer
and replays them once a receive stream with matching
ssrc is created.

This improves situation where media is incoming
but signaling or SetFrameDecryptor is slow.

BUG=webrtc:10405

Change-Id: I7c7b2f4bd96c942c09e96db0cdae4ce5efef2541
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/127543
Commit-Queue: Jonas Oreland <jonaso@webrtc.org>
Reviewed-by: Rasmus Brandt <brandtr@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27159}
This commit is contained in:
Jonas Oreland
2019-03-18 10:59:40 +01:00
committed by Commit Bot
parent 0611a15c29
commit 6d83592367
7 changed files with 415 additions and 7 deletions

View File

@ -58,6 +58,7 @@
#include "rtc_base/trace_event.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/cpu_info.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
#include "video/call_stats.h"
#include "video/send_delay_stats.h"
@ -398,6 +399,8 @@ class Call final : public webrtc::Call,
MediaTransportInterface* media_transport_
RTC_GUARDED_BY(&target_observer_crit_) = nullptr;
const bool field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_;
RTC_DISALLOW_COPY_AND_ASSIGN(Call);
};
} // namespace internal
@ -476,7 +479,10 @@ Call::Call(Clock* clock,
receive_side_cc_(clock_, transport_send->packet_router()),
receive_time_calculator_(ReceiveTimeCalculator::CreateFromFieldTrial()),
video_send_delay_stats_(new SendDelayStats(clock_)),
start_ms_(clock_->TimeInMilliseconds()) {
start_ms_(clock_->TimeInMilliseconds()),
field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_(
webrtc::field_trial::IsEnabled(
"WebRTC-Video-BufferPacketsWithUnknownSsrc")) {
RTC_DCHECK(config.event_log != nullptr);
transport_send_ = std::move(transport_send);
transport_send_ptr_ = transport_send_.get();
@ -1355,6 +1361,28 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
// which is being torned down.
return DELIVERY_UNKNOWN_SSRC;
}
if (field_trial_webrtc_video_buffer_packets_with_unknown_ssrc_) {
// Check if packet arrives for a stream that requires decryption
// but does not yet have a FrameDecryptor.
// In that case buffer the packet and replay it when the frame decryptor
// is set.
// TODO(bugs.webrtc.org/10416) : Remove this check once FrameDecryptor
// is created as part of creating receive stream.
const uint32_t ssrc = parsed_packet.Ssrc();
auto vrs = std::find_if(
video_receive_streams_.begin(), video_receive_streams_.end(),
[&](const VideoReceiveStream* stream) {
return (stream->config().rtp.remote_ssrc == ssrc ||
stream->config().rtp.rtx_ssrc == ssrc);
});
if (vrs != video_receive_streams_.end() &&
(*vrs)->config().crypto_options.sframe.require_frame_encryption &&
(*vrs)->config().frame_decryptor == nullptr) {
return DELIVERY_UNKNOWN_SSRC;
}
}
parsed_packet.IdentifyExtensions(it->second.extensions);
NotifyBweOfReceivedPacket(parsed_packet, media_type);

View File

@ -286,6 +286,8 @@ rtc_static_library("rtc_audio_video") {
"engine/payload_type_mapper.h",
"engine/simulcast.cc",
"engine/simulcast.h",
"engine/unhandled_packets_buffer.cc",
"engine/unhandled_packets_buffer.h",
"engine/webrtc_media_engine.cc",
"engine/webrtc_media_engine.h",
"engine/webrtc_video_engine.cc",
@ -528,6 +530,7 @@ if (rtc_include_tests) {
"engine/payload_type_mapper_unittest.cc",
"engine/simulcast_encoder_adapter_unittest.cc",
"engine/simulcast_unittest.cc",
"engine/unhandled_packets_buffer_unittest.cc",
"engine/webrtc_media_engine_unittest.cc",
"engine/webrtc_video_engine_unittest.cc",
]

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "media/engine/unhandled_packets_buffer.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
namespace cricket {
UnhandledPacketsBuffer::UnhandledPacketsBuffer() {
buffer_.reserve(kMaxStashedPackets);
}
UnhandledPacketsBuffer::~UnhandledPacketsBuffer() = default;
// Store packet in buffer.
void UnhandledPacketsBuffer::AddPacket(uint32_t ssrc,
int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
if (buffer_.size() < kMaxStashedPackets) {
buffer_.push_back({ssrc, packet_time_us, packet});
} else {
RTC_DCHECK_LT(insert_pos_, kMaxStashedPackets);
buffer_[insert_pos_] = {ssrc, packet_time_us, packet};
}
insert_pos_ = (insert_pos_ + 1) % kMaxStashedPackets;
}
// Backfill |consumer| with all stored packet related |ssrcs|.
void UnhandledPacketsBuffer::BackfillPackets(
rtc::ArrayView<const uint32_t> ssrcs,
std::function<void(uint32_t, int64_t, rtc::CopyOnWriteBuffer)> consumer) {
size_t start;
if (buffer_.size() < kMaxStashedPackets) {
start = 0;
} else {
start = insert_pos_;
}
size_t count = 0;
std::vector<PacketWithMetadata> remaining;
remaining.reserve(kMaxStashedPackets);
for (size_t i = 0; i < buffer_.size(); ++i) {
const size_t pos = (i + start) % kMaxStashedPackets;
// One or maybe 2 ssrcs is expected => loop array instead of more elaborate
// scheme.
const uint32_t ssrc = buffer_[pos].ssrc;
if (std::find(ssrcs.begin(), ssrcs.end(), ssrc) != ssrcs.end()) {
++count;
consumer(ssrc, buffer_[pos].packet_time_us, buffer_[pos].packet);
} else {
remaining.push_back(buffer_[pos]);
}
}
insert_pos_ = 0; // insert_pos is only used when buffer is full.
buffer_.swap(remaining);
}
} // namespace cricket

View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_
#define MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_
#include <stdint.h>
#include <functional>
#include <tuple>
#include <utility>
#include <vector>
#include "rtc_base/copy_on_write_buffer.h"
namespace cricket {
class UnhandledPacketsBuffer {
public:
// Visible for testing.
static constexpr size_t kMaxStashedPackets = 50;
UnhandledPacketsBuffer();
~UnhandledPacketsBuffer();
// Store packet in buffer.
void AddPacket(uint32_t ssrc,
int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet);
// Feed all packets with |ssrcs| into |consumer|.
void BackfillPackets(
rtc::ArrayView<const uint32_t> ssrcs,
std::function<void(uint32_t, int64_t, rtc::CopyOnWriteBuffer)> consumer);
private:
size_t insert_pos_ = 0;
struct PacketWithMetadata {
uint32_t ssrc;
int64_t packet_time_us;
rtc::CopyOnWriteBuffer packet;
};
std::vector<PacketWithMetadata> buffer_;
};
} // namespace cricket
#endif // MEDIA_ENGINE_UNHANDLED_PACKETS_BUFFER_H_

View File

@ -0,0 +1,170 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "media/engine/unhandled_packets_buffer.h"
#include <memory>
#include "absl/memory/memory.h"
#include "test/gmock.h"
#include "test/gtest.h"
using ::testing::_;
namespace {
rtc::CopyOnWriteBuffer Create(int n) {
return rtc::CopyOnWriteBuffer(std::to_string(n));
}
constexpr int64_t kPacketTimeUs = 122;
} // namespace
namespace cricket {
TEST(UnhandledPacketsBuffer, NoPackets) {
UnhandledPacketsBuffer buff;
buff.AddPacket(2, kPacketTimeUs, Create(3));
std::vector<uint32_t> ssrcs = {3};
std::vector<rtc::CopyOnWriteBuffer> packets;
buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
packets.push_back(packet);
});
EXPECT_EQ(0u, packets.size());
}
TEST(UnhandledPacketsBuffer, OnePacket) {
UnhandledPacketsBuffer buff;
buff.AddPacket(2, kPacketTimeUs, Create(3));
std::vector<uint32_t> ssrcs = {2};
std::vector<rtc::CopyOnWriteBuffer> packets;
buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
packets.push_back(packet);
});
ASSERT_EQ(1u, packets.size());
EXPECT_EQ(Create(3), packets[0]);
}
TEST(UnhandledPacketsBuffer, TwoPacketsTwoSsrcs) {
UnhandledPacketsBuffer buff;
buff.AddPacket(2, kPacketTimeUs, Create(3));
buff.AddPacket(3, kPacketTimeUs, Create(4));
std::vector<uint32_t> ssrcs = {2, 3};
std::vector<rtc::CopyOnWriteBuffer> packets;
buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
packets.push_back(packet);
});
ASSERT_EQ(2u, packets.size());
EXPECT_EQ(Create(3), packets[0]);
EXPECT_EQ(Create(4), packets[1]);
}
TEST(UnhandledPacketsBuffer, TwoPacketsTwoSsrcsOneMatch) {
UnhandledPacketsBuffer buff;
buff.AddPacket(2, kPacketTimeUs, Create(3));
buff.AddPacket(3, kPacketTimeUs, Create(4));
std::vector<uint32_t> ssrcs = {3};
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 3u);
EXPECT_EQ(Create(4), packet);
});
std::vector<uint32_t> ssrcs_again = {2};
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 2u);
EXPECT_EQ(Create(3), packet);
});
}
TEST(UnhandledPacketsBuffer, Full) {
const size_t cnt = UnhandledPacketsBuffer::kMaxStashedPackets;
UnhandledPacketsBuffer buff;
for (size_t i = 0; i < cnt; i++) {
buff.AddPacket(2, kPacketTimeUs, Create(i));
}
std::vector<uint32_t> ssrcs = {2};
std::vector<rtc::CopyOnWriteBuffer> packets;
buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
packets.push_back(packet);
});
ASSERT_EQ(cnt, packets.size());
for (size_t i = 0; i < cnt; i++) {
EXPECT_EQ(Create(i), packets[i]);
}
// Add a packet after backfill and check that it comes back.
buff.AddPacket(23, kPacketTimeUs, Create(1001));
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 23u);
EXPECT_EQ(Create(1001), packet);
});
}
TEST(UnhandledPacketsBuffer, Wrap) {
UnhandledPacketsBuffer buff;
size_t cnt = UnhandledPacketsBuffer::kMaxStashedPackets + 10;
for (size_t i = 0; i < cnt; i++) {
buff.AddPacket(2, kPacketTimeUs, Create(i));
}
std::vector<uint32_t> ssrcs = {2};
std::vector<rtc::CopyOnWriteBuffer> packets;
buff.BackfillPackets(ssrcs, [&packets](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
packets.push_back(packet);
});
for (size_t i = 0; i < packets.size(); i++) {
EXPECT_EQ(Create(i + 10), packets[i]);
}
// Add a packet after backfill and check that it comes back.
buff.AddPacket(23, kPacketTimeUs, Create(1001));
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 23u);
EXPECT_EQ(Create(1001), packet);
});
}
TEST(UnhandledPacketsBuffer, Interleaved) {
UnhandledPacketsBuffer buff;
buff.AddPacket(2, kPacketTimeUs, Create(2));
buff.AddPacket(3, kPacketTimeUs, Create(3));
std::vector<uint32_t> ssrcs = {2};
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 2u);
EXPECT_EQ(Create(2), packet);
});
buff.AddPacket(4, kPacketTimeUs, Create(4));
ssrcs = {3};
buff.BackfillPackets(ssrcs, [](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
EXPECT_EQ(ssrc, 3u);
EXPECT_EQ(Create(3), packet);
});
}
} // namespace cricket

View File

@ -528,7 +528,12 @@ WebRtcVideoChannel::WebRtcVideoChannel(
last_stats_log_ms_(-1),
discard_unknown_ssrc_packets_(webrtc::field_trial::IsEnabled(
"WebRTC-Video-DiscardPacketsWithUnknownSsrc")),
crypto_options_(crypto_options) {
crypto_options_(crypto_options),
unknown_ssrc_packet_buffer_(
webrtc::field_trial::IsEnabled(
"WebRTC-Video-BufferPacketsWithUnknownSsrc")
? new UnhandledPacketsBuffer()
: nullptr) {
RTC_DCHECK(thread_checker_.CalledOnValidThread());
rtcp_receiver_report_ssrc_ = kDefaultRtcpReceiverReportSsrc;
@ -1183,7 +1188,7 @@ bool WebRtcVideoChannel::AddRecvStream(const StreamParams& sp,
}
receive_streams_[ssrc] = new WebRtcVideoReceiveStream(
call_, sp, std::move(config), decoder_factory_, default_stream,
this, call_, sp, std::move(config), decoder_factory_, default_stream,
recv_codecs_, flexfec_config);
return true;
@ -1375,12 +1380,17 @@ void WebRtcVideoChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
break;
}
if (discard_unknown_ssrc_packets_) {
uint32_t ssrc = 0;
if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) {
return;
}
uint32_t ssrc = 0;
if (!GetRtpSsrc(packet.cdata(), packet.size(), &ssrc)) {
if (unknown_ssrc_packet_buffer_) {
unknown_ssrc_packet_buffer_->AddPacket(ssrc, packet_time_us, packet);
return;
}
if (discard_unknown_ssrc_packets_) {
return;
}
@ -1420,6 +1430,52 @@ void WebRtcVideoChannel::OnPacketReceived(rtc::CopyOnWriteBuffer packet,
}
}
void WebRtcVideoChannel::BackfillBufferedPackets(
rtc::ArrayView<const uint32_t> ssrcs) {
RTC_DCHECK_RUN_ON(&thread_checker_);
if (!unknown_ssrc_packet_buffer_) {
return;
}
int delivery_ok_cnt = 0;
int delivery_unknown_ssrc_cnt = 0;
int delivery_packet_error_cnt = 0;
webrtc::PacketReceiver* receiver = this->call_->Receiver();
unknown_ssrc_packet_buffer_->BackfillPackets(
ssrcs, [&](uint32_t ssrc, int64_t packet_time_us,
rtc::CopyOnWriteBuffer packet) {
switch (receiver->DeliverPacket(webrtc::MediaType::VIDEO, packet,
packet_time_us)) {
case webrtc::PacketReceiver::DELIVERY_OK:
delivery_ok_cnt++;
break;
case webrtc::PacketReceiver::DELIVERY_UNKNOWN_SSRC:
delivery_unknown_ssrc_cnt++;
break;
case webrtc::PacketReceiver::DELIVERY_PACKET_ERROR:
delivery_packet_error_cnt++;
break;
}
});
rtc::StringBuilder out;
out << "[ ";
for (uint32_t ssrc : ssrcs) {
out << std::to_string(ssrc) << " ";
}
out << "]";
auto level = rtc::LS_INFO;
if (delivery_unknown_ssrc_cnt > 0 || delivery_packet_error_cnt > 0) {
level = rtc::LS_ERROR;
}
int total =
delivery_ok_cnt + delivery_unknown_ssrc_cnt + delivery_packet_error_cnt;
RTC_LOG_V(level) << "Backfilled " << total
<< " packets for ssrcs: " << out.Release()
<< " ok: " << delivery_ok_cnt
<< " error: " << delivery_packet_error_cnt
<< " unknown: " << delivery_unknown_ssrc_cnt;
}
void WebRtcVideoChannel::OnRtcpReceived(rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(&thread_checker_);
@ -1909,6 +1965,9 @@ void WebRtcVideoChannel::WebRtcVideoSendStream::SetFrameEncryptor(
RTC_DCHECK_RUN_ON(&thread_checker_);
parameters_.config.frame_encryptor = frame_encryptor;
if (stream_) {
RTC_LOG(LS_INFO)
<< "RecreateWebRtcStream (send) because of SetFrameEncryptor, ssrc="
<< parameters_.config.rtp.ssrcs[0];
RecreateWebRtcStream();
}
}
@ -2237,6 +2296,7 @@ void WebRtcVideoChannel::WebRtcVideoSendStream::RecreateWebRtcStream() {
}
WebRtcVideoChannel::WebRtcVideoReceiveStream::WebRtcVideoReceiveStream(
WebRtcVideoChannel* channel,
webrtc::Call* call,
const StreamParams& sp,
webrtc::VideoReceiveStream::Config config,
@ -2244,7 +2304,8 @@ WebRtcVideoChannel::WebRtcVideoReceiveStream::WebRtcVideoReceiveStream(
bool default_stream,
const std::vector<VideoCodecSettings>& recv_codecs,
const webrtc::FlexfecReceiveStream::Config& flexfec_config)
: call_(call),
: channel_(channel),
call_(call),
stream_params_(sp),
stream_(NULL),
default_stream_(default_stream),
@ -2440,6 +2501,16 @@ void WebRtcVideoChannel::WebRtcVideoReceiveStream::RecreateWebRtcVideoStream() {
}
MaybeAssociateFlexfecWithVideo();
stream_->Start();
if (webrtc::field_trial::IsEnabled(
"WebRTC-Video-BufferPacketsWithUnknownSsrc")) {
// TODO(bugs.webrtc.org/10416) : Remove this check and backfill
// when the stream is created (i.e remote check for frame_decryptor)
// once FrameDecryptor is created as part of creating receive stream.
if (config_.frame_decryptor) {
channel_->BackfillBufferedPackets(stream_params_.ssrcs);
}
}
}
void WebRtcVideoChannel::WebRtcVideoReceiveStream::
@ -2496,6 +2567,9 @@ void WebRtcVideoChannel::WebRtcVideoReceiveStream::SetFrameDecryptor(
rtc::scoped_refptr<webrtc::FrameDecryptorInterface> frame_decryptor) {
config_.frame_decryptor = frame_decryptor;
if (stream_) {
RTC_LOG(LS_INFO)
<< "RecreateWebRtcStream (recv) because of SetFrameDecryptor, "
<< "remote_ssrc=" << config_.rtp.remote_ssrc;
RecreateWebRtcVideoStream();
}
}

View File

@ -29,6 +29,7 @@
#include "call/video_receive_stream.h"
#include "call/video_send_stream.h"
#include "media/base/media_engine.h"
#include "media/engine/unhandled_packets_buffer.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/network_route.h"
@ -199,6 +200,10 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport {
std::vector<webrtc::RtpSource> GetSources(uint32_t ssrc) const override;
// Take the buffered packets for |ssrcs| and feed them into DeliverPacket.
// This method does nothing unless unknown_ssrc_packet_buffer_ is configured.
void BackfillBufferedPackets(rtc::ArrayView<const uint32_t> ssrcs);
private:
class WebRtcVideoReceiveStream;
struct VideoCodecSettings {
@ -375,6 +380,7 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport {
: public rtc::VideoSinkInterface<webrtc::VideoFrame> {
public:
WebRtcVideoReceiveStream(
WebRtcVideoChannel* channel,
webrtc::Call* call,
const StreamParams& sp,
webrtc::VideoReceiveStream::Config config,
@ -425,6 +431,7 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport {
std::string GetCodecNameFromPayloadType(int payload_type);
WebRtcVideoChannel* const channel_;
webrtc::Call* const call_;
const StreamParams stream_params_;
@ -539,6 +546,10 @@ class WebRtcVideoChannel : public VideoMediaChannel, public webrtc::Transport {
// Per peer connection crypto options that last for the lifetime of the peer
// connection.
const webrtc::CryptoOptions crypto_options_ RTC_GUARDED_BY(thread_checker_);
// Buffer for unhandled packets.
std::unique_ptr<UnhandledPacketsBuffer> unknown_ssrc_packet_buffer_
RTC_GUARDED_BY(thread_checker_);
};
class EncoderStreamFactory