Remove packets from RtpPacketHistory if acked via TransportFeedback

If the receiver has indicated that a packet has been received, via a
TransportFeedback RTCP message, it is safe to remove it from the
RtpPacketHistory as we can be sure it won't be needed anymore.
This will reduce memory usage, reduce the risk of overflow in the
history at very high bitrates, and hopefully make payload based padding
a little more useful.

Bug: webrtc:8975
Change-Id: I703a353252943f63d7d6edda68f03bc482633fd6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/133028
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27745}
This commit is contained in:
Erik Språng
2019-04-24 19:26:40 +02:00
committed by Commit Bot
parent a8ae407a48
commit 3890e99b70
12 changed files with 236 additions and 28 deletions

View File

@ -16,6 +16,7 @@
#include <utility>
#include "absl/memory/memory.h"
#include "api/array_view.h"
#include "api/transport/field_trial_based_config.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "modules/pacing/packet_router.h"
@ -323,18 +324,14 @@ RtpVideoSender::RtpVideoSender(
fec_controller_->SetProtectionCallback(this);
// Signal congestion controller this object is ready for OnPacket* callbacks.
if (fec_controller_->UseLossVectorMask()) {
transport_->RegisterPacketFeedbackObserver(this);
}
transport_->RegisterPacketFeedbackObserver(this);
}
RtpVideoSender::~RtpVideoSender() {
for (const RtpStreamSender& stream : rtp_streams_) {
transport_->packet_router()->RemoveSendRtpModule(stream.rtp_rtcp.get());
}
if (fec_controller_->UseLossVectorMask()) {
transport_->DeRegisterPacketFeedbackObserver(this);
}
transport_->DeRegisterPacketFeedbackObserver(this);
}
void RtpVideoSender::RegisterProcessThread(
@ -567,6 +564,7 @@ void RtpVideoSender::DeliverRtcp(const uint8_t* packet, size_t length) {
void RtpVideoSender::ConfigureSsrcs(const RtpConfig& rtp_config) {
// Configure regular SSRCs.
ssrc_to_acknowledged_packets_observers_.clear();
for (size_t i = 0; i < rtp_config.ssrcs.size(); ++i) {
uint32_t ssrc = rtp_config.ssrcs[i];
RtpRtcp* const rtp_rtcp = rtp_streams_[i].rtp_rtcp.get();
@ -576,6 +574,12 @@ void RtpVideoSender::ConfigureSsrcs(const RtpConfig& rtp_config) {
auto it = suspended_ssrcs_.find(ssrc);
if (it != suspended_ssrcs_.end())
rtp_rtcp->SetRtpState(it->second);
AcknowledgedPacketsObserver* receive_observer =
rtp_rtcp->GetAcknowledgedPacketsObserver();
if (receive_observer != nullptr) {
ssrc_to_acknowledged_packets_observers_[ssrc] = receive_observer;
}
}
// Set up RTX if available.
@ -791,15 +795,41 @@ void RtpVideoSender::OnPacketAdded(uint32_t ssrc, uint16_t seq_num) {
void RtpVideoSender::OnPacketFeedbackVector(
const std::vector<PacketFeedback>& packet_feedback_vector) {
rtc::CritScope lock(&crit_);
// Lost feedbacks are not considered to be lost packets.
const bool use_loss_mask = fec_controller_->UseLossVectorMask();
std::vector<bool> loss_mask;
std::map<uint32_t, std::vector<uint16_t>> acked_packets;
for (const PacketFeedback& packet : packet_feedback_vector) {
auto it = feedback_packet_seq_num_set_.find(packet.sequence_number);
if (it != feedback_packet_seq_num_set_.end()) {
const bool lost = packet.arrival_time_ms == PacketFeedback::kNotReceived;
loss_mask_vector_.push_back(lost);
feedback_packet_seq_num_set_.erase(it);
if (use_loss_mask) {
// Lost feedbacks are not considered to be lost packets.
auto it = feedback_packet_seq_num_set_.find(packet.sequence_number);
if (it != feedback_packet_seq_num_set_.end()) {
const bool lost =
packet.arrival_time_ms == PacketFeedback::kNotReceived;
loss_mask.push_back(lost);
feedback_packet_seq_num_set_.erase(it);
}
}
if (packet.ssrc) {
if (ssrc_to_acknowledged_packets_observers_.find(*packet.ssrc) !=
ssrc_to_acknowledged_packets_observers_.end()) {
acked_packets[*packet.ssrc].push_back(packet.rtp_sequence_number);
}
}
}
if (use_loss_mask) {
rtc::CritScope cs(&crit_);
loss_mask_vector_.insert(loss_mask_vector_.end(), loss_mask.begin(),
loss_mask.end());
}
for (const auto& kv : acked_packets) {
const uint32_t ssrc = kv.first;
rtc::ArrayView<const uint16_t> rtp_sequence_numbers(kv.second);
RTC_DCHECK(ssrc_to_acknowledged_packets_observers_.find(ssrc) !=
ssrc_to_acknowledged_packets_observers_.end());
ssrc_to_acknowledged_packets_observers_[ssrc]->OnPacketsAcknowledged(
rtp_sequence_numbers);
}
}

View File

@ -172,7 +172,7 @@ class RtpVideoSender : public RtpVideoSenderInterface,
std::map<uint32_t, RtpState> suspended_ssrcs_;
std::unique_ptr<FlexfecSender> flexfec_sender_;
std::unique_ptr<FecController> fec_controller_;
const std::unique_ptr<FecController> fec_controller_;
// Rtp modules are assumed to be sorted in simulcast index order.
const std::vector<webrtc_internal_rtp_video_sender::RtpStreamSender>
rtp_streams_;
@ -197,6 +197,12 @@ class RtpVideoSender : public RtpVideoSenderInterface,
std::vector<FrameCounts> frame_counts_ RTC_GUARDED_BY(crit_);
FrameCountObserver* const frame_count_observer_;
// Effectively const map from ssrc to AcknowledgedPacketsObserver. This
// map is set at construction time and never changed, but it's
// non-trivial to make it properly const.
std::map<uint32_t, AcknowledgedPacketsObserver*>
ssrc_to_acknowledged_packets_observers_;
RTC_DISALLOW_COPY_AND_ASSIGN(RtpVideoSender);
};

View File

@ -15,8 +15,13 @@
#include "api/task_queue/default_task_queue_factory.h"
#include "call/rtp_transport_controller_send.h"
#include "call/rtp_video_sender.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/byte_io.h"
#include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/video_coding/fec_controller_default.h"
#include "modules/video_coding/include/video_codec_interface.h"
#include "rtc_base/event.h"
#include "rtc_base/rate_limiter.h"
#include "test/field_trial.h"
#include "test/gmock.h"
@ -27,6 +32,7 @@
#include "video/send_statistics_proxy.h"
using ::testing::_;
using ::testing::Invoke;
using ::testing::NiceMock;
using ::testing::SaveArg;
using ::testing::Unused;
@ -36,6 +42,8 @@ namespace {
const int8_t kPayloadType = 96;
const uint32_t kSsrc1 = 12345;
const uint32_t kSsrc2 = 23456;
const uint32_t kRtxSsrc1 = 34567;
const uint32_t kRtxSsrc2 = 45678;
const int16_t kInitialPictureId1 = 222;
const int16_t kInitialPictureId2 = 44;
const int16_t kInitialTl0PicIdx1 = 99;
@ -60,6 +68,7 @@ RtpSenderObservers CreateObservers(
RtpSenderObservers observers;
observers.rtcp_rtt_stats = rtcp_rtt_stats;
observers.intra_frame_callback = intra_frame_callback;
observers.rtcp_loss_notification_observer = nullptr;
observers.rtcp_stats = rtcp_stats;
observers.rtp_stats = rtp_stats;
observers.bitrate_observer = bitrate_observer;
@ -70,16 +79,43 @@ RtpSenderObservers CreateObservers(
return observers;
}
BitrateConstraints GetBitrateConfig() {
BitrateConstraints bitrate_config;
bitrate_config.min_bitrate_bps = 30000;
bitrate_config.start_bitrate_bps = 300000;
bitrate_config.max_bitrate_bps = 3000000;
return bitrate_config;
}
VideoSendStream::Config CreateVideoSendStreamConfig(
Transport* transport,
const std::vector<uint32_t>& ssrcs,
const std::vector<uint32_t>& rtx_ssrcs,
int payload_type) {
VideoSendStream::Config config(transport);
config.rtp.ssrcs = ssrcs;
config.rtp.rtx.ssrcs = rtx_ssrcs;
config.rtp.payload_type = payload_type;
config.rtp.rtx.payload_type = payload_type + 1;
config.rtp.nack.rtp_history_ms = 1000;
return config;
}
class RtpVideoSenderTestFixture {
public:
RtpVideoSenderTestFixture(
const std::vector<uint32_t>& ssrcs,
const std::vector<uint32_t>& rtx_ssrcs,
int payload_type,
const std::map<uint32_t, RtpPayloadState>& suspended_payload_states,
FrameCountObserver* frame_count_observer)
: clock_(1000000),
config_(&transport_),
config_(CreateVideoSendStreamConfig(&transport_,
ssrcs,
rtx_ssrcs,
payload_type)),
send_delay_stats_(&clock_),
bitrate_config_(GetBitrateConfig()),
task_queue_factory_(CreateDefaultTaskQueueFactory()),
transport_controller_(&clock_,
&event_log_,
@ -94,10 +130,6 @@ class RtpVideoSenderTestFixture {
config_,
VideoEncoderConfig::ContentType::kRealtimeVideo),
retransmission_rate_limiter_(&clock_, kRetransmitWindowSizeMs) {
for (uint32_t ssrc : ssrcs) {
config_.rtp.ssrcs.push_back(ssrc);
}
config_.rtp.payload_type = payload_type;
std::map<uint32_t, RtpState> suspended_ssrcs;
router_ = absl::make_unique<RtpVideoSender>(
&clock_, suspended_ssrcs, suspended_payload_states, config_.rtp,
@ -111,14 +143,18 @@ class RtpVideoSenderTestFixture {
}
RtpVideoSenderTestFixture(
const std::vector<uint32_t>& ssrcs,
const std::vector<uint32_t>& rtx_ssrcs,
int payload_type,
const std::map<uint32_t, RtpPayloadState>& suspended_payload_states)
: RtpVideoSenderTestFixture(ssrcs,
rtx_ssrcs,
payload_type,
suspended_payload_states,
/*frame_count_observer=*/nullptr) {}
RtpVideoSender* router() { return router_.get(); }
MockTransport& transport() { return transport_; }
SimulatedClock& clock() { return clock_; }
private:
NiceMock<MockTransport> transport_;
@ -148,7 +184,7 @@ TEST(RtpVideoSenderTest, SendOnOneModule) {
encoded_image.data()[0] = kPayload;
encoded_image.set_size(1);
RtpVideoSenderTestFixture test({kSsrc1}, kPayloadType, {});
RtpVideoSenderTestFixture test({kSsrc1}, {kRtxSsrc1}, kPayloadType, {});
EXPECT_NE(
EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr, nullptr).error);
@ -179,7 +215,8 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) {
encoded_image_1.data()[0] = kPayload;
encoded_image_1.set_size(1);
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, kPayloadType, {});
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
CodecSpecificInfo codec_info;
codec_info.codecType = kVideoCodecVP8;
@ -226,7 +263,8 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
EncodedImage encoded_image_2(encoded_image_1);
encoded_image_2.SetSpatialIndex(1);
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, kPayloadType, {});
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
CodecSpecificInfo codec_info;
codec_info.codecType = kVideoCodecVP8;
@ -256,7 +294,8 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
}
TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, kPayloadType, {});
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
test.router()->SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
@ -280,7 +319,8 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) {
std::map<uint32_t, RtpPayloadState> states = {{kSsrc1, state1},
{kSsrc2, state2}};
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, kPayloadType, states);
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, states);
test.router()->SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
@ -301,7 +341,8 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
void(const FrameCounts& frame_counts, uint32_t ssrc));
} callback;
RtpVideoSenderTestFixture test({kSsrc1}, kPayloadType, {}, &callback);
RtpVideoSenderTestFixture test({kSsrc1}, {kRtxSsrc1}, kPayloadType, {},
&callback);
constexpr uint8_t kPayload = 'a';
EncodedImage encoded_image;
@ -346,4 +387,86 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
EXPECT_EQ(1, frame_counts.delta_frames);
}
TEST(RtpVideoSenderTest, PropagatesTransportFeedbackToRtpSender) {
const int64_t kTimeoutMs = 500;
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
test.router()->SetActive(true);
constexpr uint8_t kPayload = 'a';
EncodedImage encoded_image;
encoded_image.SetTimestamp(1);
encoded_image.capture_time_ms_ = 2;
encoded_image._frameType = VideoFrameType::kVideoFrameKey;
encoded_image.Allocate(1);
encoded_image.data()[0] = kPayload;
encoded_image.set_size(1);
// Send image, capture first RTP packet.
rtc::Event event;
uint16_t rtp_sequence_number = 0;
uint16_t transport_sequence_number = 0;
EXPECT_CALL(test.transport(), SendRtp(_, _, _))
.WillOnce(
Invoke([&event, &rtp_sequence_number, &transport_sequence_number](
const uint8_t* packet, size_t length,
const PacketOptions& options) {
RtpPacket rtp_packet;
EXPECT_TRUE(rtp_packet.Parse(packet, length));
rtp_sequence_number = rtp_packet.SequenceNumber();
transport_sequence_number = options.packet_id;
event.Set();
return true;
}));
EXPECT_EQ(
EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr, nullptr).error);
test.clock().AdvanceTimeMilliseconds(33);
ASSERT_TRUE(event.Wait(kTimeoutMs));
// Construct a NACK message for requesting retransmission of the packet.
std::vector<uint16_t> nack_list;
nack_list.push_back(rtp_sequence_number);
rtcp::Nack nack;
nack.SetMediaSsrc(kSsrc1);
nack.SetPacketIds(nack_list);
rtc::Buffer nack_buffer = nack.Build();
uint16_t retransmitted_rtp_sequence_number = 0;
EXPECT_CALL(test.transport(), SendRtp)
.WillOnce([&event, &retransmitted_rtp_sequence_number](
const uint8_t* packet, size_t length,
const PacketOptions& options) {
RtpPacket rtp_packet;
EXPECT_TRUE(rtp_packet.Parse(packet, length));
EXPECT_EQ(rtp_packet.Ssrc(), kRtxSsrc1);
// Capture the retransmitted sequence number from the RTX header.
rtc::ArrayView<const uint8_t> payload = rtp_packet.payload();
retransmitted_rtp_sequence_number =
ByteReader<uint16_t>::ReadBigEndian(payload.data());
event.Set();
return true;
});
test.router()->DeliverRtcp(nack_buffer.data(), nack_buffer.size());
ASSERT_TRUE(event.Wait(kTimeoutMs));
EXPECT_EQ(retransmitted_rtp_sequence_number, rtp_sequence_number);
// Simulate transport feedback indicating packet has been received.
PacketFeedback feedback(test.clock().TimeInMilliseconds(),
transport_sequence_number);
feedback.rtp_sequence_number = rtp_sequence_number;
feedback.ssrc = kSsrc1;
test.router()->OnPacketFeedbackVector(
std::vector<PacketFeedback>(1, feedback));
// Advance time to make sure retranmission would be allowed and try again.
// This time the retranmission should not happen since the paket history
// has been notified of the ack and removed the packet.
test.clock().AdvanceTimeMilliseconds(33);
EXPECT_CALL(test.transport(), SendRtp).Times(0);
test.router()->DeliverRtcp(nack_buffer.data(), nack_buffer.size());
ASSERT_FALSE(event.Wait(kTimeoutMs));
}
} // namespace webrtc

View File

@ -275,6 +275,11 @@ class RtpRtcp : public Module, public RtcpFeedbackSenderInterface {
virtual StreamDataCountersCallback* GetSendChannelRtpStatisticsCallback()
const = 0;
// Returns a pointer to an observer that handles information about packets
// that have been received by the remote end, or nullptr if not applicable.
virtual AcknowledgedPacketsObserver* GetAcknowledgedPacketsObserver()
const = 0;
// **************************************************************************
// RTCP
// **************************************************************************

View File

@ -16,6 +16,7 @@
#include <vector>
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "api/audio_codecs/audio_format.h"
#include "api/rtp_headers.h"
@ -276,7 +277,7 @@ struct PacketFeedback {
PacedPacketInfo pacing_info;
// The SSRC and RTP sequence number of the packet this feedback refers to.
uint32_t ssrc;
absl::optional<uint32_t> ssrc;
uint16_t rtp_sequence_number;
};
@ -313,6 +314,17 @@ class TransportFeedbackObserver {
virtual void OnTransportFeedback(const rtcp::TransportFeedback& feedback) = 0;
};
class AcknowledgedPacketsObserver {
public:
AcknowledgedPacketsObserver() = default;
virtual ~AcknowledgedPacketsObserver() = default;
// Indicates RTP sequence numbers for packets that have been acknowledged as
// received by the remote end.
virtual void OnPacketsAcknowledged(
rtc::ArrayView<const uint16_t> sequence_numbers) = 0;
};
// Interface for PacketRouter to send rtcp feedback on behalf of
// congestion controller.
// TODO(bugs.webrtc.org/8239): Remove and use RtcpTransceiver directly

View File

@ -157,7 +157,9 @@ class MockRtpRtcp : public RtpRtcp {
MOCK_METHOD1(RegisterSendChannelRtpStatisticsCallback,
void(StreamDataCountersCallback*));
MOCK_CONST_METHOD0(GetSendChannelRtpStatisticsCallback,
StreamDataCountersCallback*(void));
StreamDataCountersCallback*());
MOCK_CONST_METHOD0(GetAcknowledgedPacketsObserver,
AcknowledgedPacketsObserver*());
MOCK_METHOD1(SetVideoBitrateAllocation, void(const VideoBitrateAllocation&));
MOCK_METHOD0(RtpSender, RTPSender*());
MOCK_CONST_METHOD0(RtpSender, const RTPSender*());

View File

@ -232,6 +232,17 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetBestFittingPacket(
return absl::make_unique<RtpPacketToSend>(*best_packet);
}
void RtpPacketHistory::CullAcknowledgedPackets(
rtc::ArrayView<const uint16_t> sequence_numbers) {
rtc::CritScope cs(&lock_);
for (uint16_t sequence_number : sequence_numbers) {
auto stored_packet_it = packet_history_.find(sequence_number);
if (stored_packet_it != packet_history_.end()) {
RemovePacket(stored_packet_it);
}
}
}
void RtpPacketHistory::Reset() {
packet_history_.clear();
packet_size_.clear();

View File

@ -89,6 +89,9 @@ class RtpPacketHistory {
std::unique_ptr<RtpPacketToSend> GetBestFittingPacket(
size_t packet_size) const;
// Cull packets that have been acknowledged as received by the remote end.
void CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers);
private:
struct StoredPacket {
StoredPacket();
@ -133,6 +136,7 @@ class RtpPacketHistory {
// Map from rtp sequence numbers to stored packet.
std::map<uint16_t, StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
// Map from packet size to sequence number.
std::map<size_t, uint16_t> packet_size_ RTC_GUARDED_BY(lock_);
// The earliest packet in the history. This might not be the lowest sequence

View File

@ -867,6 +867,11 @@ ModuleRtpRtcpImpl::GetSendChannelRtpStatisticsCallback() const {
return rtp_sender_->GetRtpStatisticsCallback();
}
AcknowledgedPacketsObserver* ModuleRtpRtcpImpl::GetAcknowledgedPacketsObserver()
const {
return rtp_sender_.get();
}
void ModuleRtpRtcpImpl::SetVideoBitrateAllocation(
const VideoBitrateAllocation& bitrate) {
rtcp_sender_.SetVideoBitrateAllocation(bitrate);

View File

@ -276,6 +276,7 @@ class ModuleRtpRtcpImpl : public RtpRtcp, public RTCPReceiver::ModuleRtpRtcp {
StreamDataCountersCallback* callback) override;
StreamDataCountersCallback* GetSendChannelRtpStatisticsCallback()
const override;
AcknowledgedPacketsObserver* GetAcknowledgedPacketsObserver() const override;
void OnReceivedNack(
const std::vector<uint16_t>& nack_sequence_numbers) override;

View File

@ -1207,6 +1207,7 @@ void RTPSender::AddPacketToTransportFeedback(
RtpPacketSendInfo packet_info;
packet_info.ssrc = SSRC();
packet_info.transport_sequence_number = packet_id;
packet_info.has_rtp_sequence_number = true;
packet_info.rtp_sequence_number = packet.SequenceNumber();
packet_info.length = packet_size;
packet_info.pacing_info = pacing_info;
@ -1238,4 +1239,9 @@ void RTPSender::SetRtt(int64_t rtt_ms) {
packet_history_.SetRtt(rtt_ms);
flexfec_packet_history_.SetRtt(rtt_ms);
}
void RTPSender::OnPacketsAcknowledged(
rtc::ArrayView<const uint16_t> sequence_numbers) {
packet_history_.CullAcknowledgedPackets(sequence_numbers);
}
} // namespace webrtc

View File

@ -42,7 +42,7 @@ class RateLimiter;
class RtcEventLog;
class RtpPacketToSend;
class RTPSender {
class RTPSender : public AcknowledgedPacketsObserver {
public:
RTPSender(bool audio,
Clock* clock,
@ -173,6 +173,9 @@ class RTPSender {
void SetRtt(int64_t rtt_ms);
void OnPacketsAcknowledged(
rtc::ArrayView<const uint16_t> sequence_numbers) override;
private:
// Maps capture time in milliseconds to send-side delay in milliseconds.
// Send-side delay is the difference between transmission time and capture