RtpPacketHistory: StoreAndCull default on, support ack removals

Add support for potentially out-of-order removals of packets, using a
vector of sequence numbers that have been acknowledges as received.

Additionally, make kStoreAndCull storage method by default with a
field-trial kill-switch if things go wrong unexpectedly.

Bug: webrtc:8975
Change-Id: I6da8b92d85fc362c12db82976f115626cb1d32d4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/134307
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27850}
This commit is contained in:
Erik Språng
2019-05-03 10:58:50 -04:00
committed by Commit Bot
parent 9363c778fe
commit d2a634447f
5 changed files with 140 additions and 41 deletions

View File

@ -79,6 +79,10 @@ void RtpPacketHistory::SetRtt(int64_t rtt_ms) {
rtc::CritScope cs(&lock_);
RTC_DCHECK_GE(rtt_ms, 0);
rtt_ms_ = rtt_ms;
// If kStoreAndCull mode is used, packets will be removed after a timeout
// that depends on the RTT. Changing the RTT may thus cause some packets
// become "old" and subject to removal.
CullOldPackets(clock_->TimeInMilliseconds());
}
void RtpPacketHistory::PutRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
@ -232,6 +236,19 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::GetBestFittingPacket(
return absl::make_unique<RtpPacketToSend>(*best_packet);
}
void RtpPacketHistory::CullAcknowledgedPackets(
rtc::ArrayView<const uint16_t> sequence_numbers) {
rtc::CritScope cs(&lock_);
if (mode_ == StorageMode::kStoreAndCull) {
for (uint16_t sequence_number : sequence_numbers) {
auto stored_packet_it = packet_history_.find(sequence_number);
if (stored_packet_it != packet_history_.end()) {
RemovePacket(stored_packet_it);
}
}
}
}
void RtpPacketHistory::Reset() {
packet_history_.clear();
packet_size_.clear();
@ -283,20 +300,27 @@ std::unique_ptr<RtpPacketToSend> RtpPacketHistory::RemovePacket(
// Move the packet out from the StoredPacket container.
std::unique_ptr<RtpPacketToSend> rtp_packet =
std::move(packet_it->second.packet);
// Check if this is the oldest packet in the history, as this must be updated
// in order to cull old packets.
const bool is_first_packet = packet_it->first == start_seqno_;
// Erase the packet from the map, and capture iterator to the next one.
StoredPacketIterator next_it = packet_history_.erase(packet_it);
// |next_it| now points to the next element, or to the end. If the end,
// check if we can wrap around.
if (next_it == packet_history_.end()) {
next_it = packet_history_.begin();
}
if (is_first_packet) {
// |next_it| now points to the next element, or to the end. If the end,
// check if we can wrap around.
if (next_it == packet_history_.end()) {
next_it = packet_history_.begin();
}
// Update |start_seq_no| to the new oldest item.
if (next_it != packet_history_.end()) {
start_seqno_ = next_it->first;
} else {
start_seqno_.reset();
// Update |start_seq_no| to the new oldest item.
if (next_it != packet_history_.end()) {
start_seqno_ = next_it->first;
} else {
start_seqno_.reset();
}
}
auto size_iterator = packet_size_.find(rtp_packet->size());

View File

@ -89,6 +89,9 @@ class RtpPacketHistory {
std::unique_ptr<RtpPacketToSend> GetBestFittingPacket(
size_t packet_size) const;
// Cull packets that have been acknowledged as received by the remote end.
void CullAcknowledgedPackets(rtc::ArrayView<const uint16_t> sequence_numbers);
private:
struct StoredPacket {
StoredPacket();
@ -133,6 +136,7 @@ class RtpPacketHistory {
// Map from rtp sequence numbers to stored packet.
std::map<uint16_t, StoredPacket> packet_history_ RTC_GUARDED_BY(lock_);
// Map from packet size to sequence number.
std::map<size_t, uint16_t> packet_size_ RTC_GUARDED_BY(lock_);
// The earliest packet in the history. This might not be the lowest sequence

View File

@ -82,9 +82,9 @@ TEST_F(RtpPacketHistoryTest, StartSeqResetAfterReset) {
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
// Add a new packet.
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission,
absl::nullopt);
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)),
kAllowRetransmission, absl::nullopt);
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
// Advance time past where packet expires.
fake_clock_.AdvanceTimeMilliseconds(
@ -95,7 +95,7 @@ TEST_F(RtpPacketHistoryTest, StartSeqResetAfterReset) {
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 2)),
kAllowRetransmission, absl::nullopt);
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)));
}
@ -282,7 +282,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestSentPacketWhenAtMaxSize) {
// Add packets until the buffer is full.
for (size_t i = 0; i < kMaxNumPackets; ++i) {
std::unique_ptr<RtpPacketToSend> packet = CreateRtpPacket(kStartSeqNum + i);
std::unique_ptr<RtpPacketToSend> packet =
CreateRtpPacket(To16u(kStartSeqNum + i));
// Immediate mark packet as sent.
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
@ -300,7 +301,7 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestSentPacketWhenAtMaxSize) {
// Oldest packet should be gone, but packet after than one still present.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) {
@ -312,7 +313,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) {
// Add packets until the buffer is full.
for (size_t i = 0; i < kMaxNumPackets; ++i) {
std::unique_ptr<RtpPacketToSend> packet = CreateRtpPacket(kStartSeqNum + i);
std::unique_ptr<RtpPacketToSend> packet =
CreateRtpPacket(To16u(kStartSeqNum + i));
// Don't mark packets as sent, preventing them from being removed.
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission, absl::nullopt);
}
@ -328,7 +330,7 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) {
// Oldest packet should be gone, but packet after than one still present.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) {
@ -361,7 +363,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) {
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + kMaxNumPackets + 1)),
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)));
}
@ -376,8 +378,8 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPackets) {
1);
// Add a new packet to trigger culling.
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)),
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
// First packet should still be there.
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum));
@ -387,7 +389,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPackets) {
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
// First packet should no be gone, but next one still there.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) {
@ -405,8 +407,8 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) {
fake_clock_.AdvanceTimeMilliseconds(kPacketTimeoutMs - 1);
// Add a new packet to trigger culling.
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)),
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
// First packet should still be there.
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum));
@ -416,7 +418,7 @@ TEST_F(RtpPacketHistoryTest, DontRemoveTooRecentlyTransmittedPacketsHighRtt) {
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
// First packet should no be gone, but next one still there.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum + 1));
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)));
}
TEST_F(RtpPacketHistoryTest, RemovesOldWithCulling) {
@ -436,8 +438,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldWithCulling) {
// Advance to where packet can be culled, even if buffer is not full.
fake_clock_.AdvanceTimeMilliseconds(1);
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)),
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
}
@ -462,8 +464,8 @@ TEST_F(RtpPacketHistoryTest, RemovesOldWithCullingHighRtt) {
// Advance to where packet can be culled, even if buffer is not full.
fake_clock_.AdvanceTimeMilliseconds(1);
hist_.PutRtpPacket(CreateRtpPacket(kStartSeqNum + 1), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.PutRtpPacket(CreateRtpPacket(To16u(kStartSeqNum + 1)),
kAllowRetransmission, fake_clock_.TimeInMilliseconds());
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum));
}
@ -478,7 +480,7 @@ TEST_F(RtpPacketHistoryTest, GetBestFittingPacket) {
const size_t target_packet_size = packet->size();
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
packet = CreateRtpPacket(kStartSeqNum + 1);
packet = CreateRtpPacket(To16u(kStartSeqNum + 1));
packet->SetPayloadSize(kTargetSize - 1);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
@ -510,7 +512,7 @@ TEST_F(RtpPacketHistoryTest,
RtpPacketHistory::kPacketCullingDelayFactor *
RtpPacketHistory::kMinPacketDurationMs);
packet = CreateRtpPacket(kStartSeqNum + 1);
packet = CreateRtpPacket(To16u(kStartSeqNum + 1));
packet->SetPayloadSize(100);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
@ -518,7 +520,7 @@ TEST_F(RtpPacketHistoryTest,
auto best_packet = hist_.GetBestFittingPacket(target_packet_size + 2);
ASSERT_THAT(best_packet, ::testing::NotNull());
EXPECT_EQ(best_packet->SequenceNumber(), kStartSeqNum + 1);
EXPECT_EQ(best_packet->SequenceNumber(), To16u(kStartSeqNum + 1));
}
TEST_F(RtpPacketHistoryTest, GetBestFittingPacketReturnLastPacketWhenSameSize) {
@ -530,14 +532,14 @@ TEST_F(RtpPacketHistoryTest, GetBestFittingPacketReturnLastPacketWhenSameSize) {
packet->SetPayloadSize(kTargetSize);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
packet = CreateRtpPacket(kStartSeqNum + 1);
packet = CreateRtpPacket(To16u(kStartSeqNum + 1));
packet->SetPayloadSize(kTargetSize);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
auto best_packet = hist_.GetBestFittingPacket(123);
ASSERT_THAT(best_packet, ::testing::NotNull());
EXPECT_EQ(best_packet->SequenceNumber(), kStartSeqNum + 1);
EXPECT_EQ(best_packet->SequenceNumber(), To16u(kStartSeqNum + 1));
}
TEST_F(RtpPacketHistoryTest,
@ -551,7 +553,7 @@ TEST_F(RtpPacketHistoryTest,
hist_.PutRtpPacket(std::move(small_packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
auto large_packet = CreateRtpPacket(kStartSeqNum + 1);
auto large_packet = CreateRtpPacket(To16u(kStartSeqNum + 1));
large_packet->SetPayloadSize(kTargetSize * 2);
hist_.PutRtpPacket(std::move(large_packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
@ -563,7 +565,7 @@ TEST_F(RtpPacketHistoryTest,
ASSERT_THAT(hist_.GetBestFittingPacket(kTargetSize * 2),
::testing::NotNull());
EXPECT_EQ(hist_.GetBestFittingPacket(kTargetSize * 2)->SequenceNumber(),
kStartSeqNum + 1);
To16u(kStartSeqNum + 1));
}
TEST_F(RtpPacketHistoryTest,
@ -578,4 +580,60 @@ TEST_F(RtpPacketHistoryTest,
::testing::NotNull());
}
TEST_F(RtpPacketHistoryTest, CullWithAcks) {
const int64_t kPacketLifetime = RtpPacketHistory::kMinPacketDurationMs *
RtpPacketHistory::kPacketCullingDelayFactor;
const int64_t start_time = fake_clock_.TimeInMilliseconds();
hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, 10);
// Insert three packets 33ms apart, immediately mark them as sent.
std::unique_ptr<RtpPacketToSend> packet = CreateRtpPacket(kStartSeqNum);
packet->SetPayloadSize(50);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.GetPacketAndSetSendTime(kStartSeqNum);
fake_clock_.AdvanceTimeMilliseconds(33);
packet = CreateRtpPacket(To16u(kStartSeqNum + 1));
packet->SetPayloadSize(50);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.GetPacketAndSetSendTime(To16u(kStartSeqNum + 1));
fake_clock_.AdvanceTimeMilliseconds(33);
packet = CreateRtpPacket(To16u(kStartSeqNum + 2));
packet->SetPayloadSize(50);
hist_.PutRtpPacket(std::move(packet), kAllowRetransmission,
fake_clock_.TimeInMilliseconds());
hist_.GetPacketAndSetSendTime(To16u(kStartSeqNum + 2));
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum).has_value());
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value());
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
// Remove middle one using ack, check that only that one is gone.
std::vector<uint16_t> acked_sequence_numbers = {To16u(kStartSeqNum + 1)};
hist_.CullAcknowledgedPackets(acked_sequence_numbers);
EXPECT_TRUE(hist_.GetPacketState(kStartSeqNum).has_value());
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value());
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
// Advance time to where second packet would have expired, verify first packet
// is removed.
int64_t second_packet_expiry_time = start_time + kPacketLifetime + 33 + 1;
fake_clock_.AdvanceTimeMilliseconds(second_packet_expiry_time -
fake_clock_.TimeInMilliseconds());
hist_.SetRtt(1); // Trigger culling of old packets.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value());
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value());
EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
// Advance to where last packet expires, verify all gone.
fake_clock_.AdvanceTimeMilliseconds(33);
hist_.SetRtt(1); // Trigger culling of old packets.
EXPECT_FALSE(hist_.GetPacketState(kStartSeqNum).has_value());
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 1)).has_value());
EXPECT_FALSE(hist_.GetPacketState(To16u(kStartSeqNum + 2)).has_value());
}
} // namespace webrtc

View File

@ -149,6 +149,9 @@ RTPSender::RTPSender(
populate_network2_timestamp_(populate_network2_timestamp),
send_side_bwe_with_overhead_(
field_trials.Lookup("WebRTC-SendSideBwe-WithOverhead")
.find("Enabled") == 0),
legacy_packet_history_storage_mode_(
field_trials.Lookup("WebRTC-UseRtpPacketHistoryLegacyStorageMode")
.find("Enabled") == 0) {
// This random initialization is not intended to be cryptographic strong.
timestamp_offset_ = random_.Rand<uint32_t>();
@ -159,9 +162,13 @@ RTPSender::RTPSender(
// Store FlexFEC packets in the packet history data structure, so they can
// be found when paced.
if (flexfec_ssrc_) {
RtpPacketHistory::StorageMode storage_mode =
legacy_packet_history_storage_mode_
? RtpPacketHistory::StorageMode::kStore
: RtpPacketHistory::StorageMode::kStoreAndCull;
flexfec_packet_history_.SetStorePacketsStatus(
RtpPacketHistory::StorageMode::kStore,
kMinFlexfecPacketsToStoreForPacing);
storage_mode, kMinFlexfecPacketsToStoreForPacing);
}
}
@ -423,9 +430,14 @@ size_t RTPSender::SendPadData(size_t bytes,
}
void RTPSender::SetStorePacketsStatus(bool enable, uint16_t number_to_store) {
RtpPacketHistory::StorageMode mode =
enable ? RtpPacketHistory::StorageMode::kStore
: RtpPacketHistory::StorageMode::kDisabled;
RtpPacketHistory::StorageMode mode;
if (enable) {
mode = legacy_packet_history_storage_mode_
? RtpPacketHistory::StorageMode::kStore
: RtpPacketHistory::StorageMode::kStoreAndCull;
} else {
mode = RtpPacketHistory::StorageMode::kDisabled;
}
packet_history_.SetStorePacketsStatus(mode, number_to_store);
}

View File

@ -292,6 +292,7 @@ class RTPSender {
const bool populate_network2_timestamp_;
const bool send_side_bwe_with_overhead_;
const bool legacy_packet_history_storage_mode_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RTPSender);
};