diff --git a/modules/rtp_rtcp/source/rtp_packet_history.cc b/modules/rtp_rtcp/source/rtp_packet_history.cc index b369f402fe..85689f9637 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history.cc @@ -23,7 +23,6 @@ namespace webrtc { constexpr size_t RtpPacketHistory::kMaxCapacity; -constexpr size_t RtpPacketHistory::kMaxPaddingtHistory; constexpr int64_t RtpPacketHistory::kMinPacketDurationMs; constexpr int RtpPacketHistory::kMinPacketDurationRtt; constexpr int RtpPacketHistory::kPacketCullingDelayFactor; @@ -131,28 +130,18 @@ void RtpPacketHistory::PutRtpPacket(std::unique_ptr packet, // Store packet. const uint16_t rtp_seq_no = packet->SequenceNumber(); - int packet_index = GetPacketIndex(rtp_seq_no); - RTC_DCHECK_GE(packet_index, 0) << "Out-of-order inserts not supported."; - size_t index = packet_index; + auto packet_it = packet_history_.emplace( + rtp_seq_no, + StoredPacket(std::move(packet), send_time_ms, packets_inserted_++)); + RTC_DCHECK(packet_it.second) << "Failed to insert packet in history."; + StoredPacket& stored_packet = packet_it.first->second; - while (packet_history_.size() < index) { - packet_history_.emplace_back(nullptr, absl::nullopt, 0); - } - RTC_DCHECK(packet_history_.size() == index || - packet_history_[index].packet_ == nullptr); - - if (packet_history_.size() <= index) { - packet_history_.emplace_back(std::move(packet), send_time_ms, - packets_inserted_++); - } else { - packet_history_[packet_index] = - StoredPacket(std::move(packet), send_time_ms, packets_inserted_++); + if (!start_seqno_) { + start_seqno_ = rtp_seq_no; } - if (padding_priority_.size() >= kMaxPaddingtHistory - 1) { - padding_priority_.erase(std::prev(padding_priority_.end())); - } - auto prio_it = padding_priority_.insert(&packet_history_[packet_index]); + // Store the sequence number of the last send packet with this size. + auto prio_it = padding_priority_.insert(&stored_packet); RTC_DCHECK(prio_it.second) << "Failed to insert packet into prio set."; } @@ -163,26 +152,27 @@ std::unique_ptr RtpPacketHistory::GetPacketAndSetSendTime( return nullptr; } - StoredPacket* packet = GetStoredPacket(sequence_number); - if (packet == nullptr) { - return nullptr; - } - int64_t now_ms = clock_->TimeInMilliseconds(); - if (!VerifyRtt(*packet, now_ms)) { + StoredPacketIterator rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { return nullptr; } - if (packet->send_time_ms_) { - packet->IncrementTimesRetransmitted(&padding_priority_); + StoredPacket& packet = rtp_it->second; + if (!VerifyRtt(rtp_it->second, now_ms)) { + return nullptr; + } + + if (packet.send_time_ms_) { + packet.IncrementTimesRetransmitted(&padding_priority_); } // Update send-time and mark as no long in pacer queue. - packet->send_time_ms_ = now_ms; - packet->pending_transmission_ = false; + packet.send_time_ms_ = now_ms; + packet.pending_transmission_ = false; - // Return copy of packet instance since it may need to be retransmitted. - return absl::make_unique(*packet->packet_); + // Return copy of packet instance since it may need to be retransmitted again. + return absl::make_unique(*packet.packet_); } std::unique_ptr RtpPacketHistory::GetPacketAndMarkAsPending( @@ -202,26 +192,29 @@ std::unique_ptr RtpPacketHistory::GetPacketAndMarkAsPending( return nullptr; } - StoredPacket* packet = GetStoredPacket(sequence_number); - if (packet == nullptr) { + int64_t now_ms = clock_->TimeInMilliseconds(); + StoredPacketIterator rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { return nullptr; } - if (packet->pending_transmission_) { + StoredPacket& packet = rtp_it->second; + + if (packet.pending_transmission_) { // Packet already in pacer queue, ignore this request. return nullptr; } - if (!VerifyRtt(*packet, clock_->TimeInMilliseconds())) { + if (!VerifyRtt(rtp_it->second, now_ms)) { // Packet already resent within too short a time window, ignore. return nullptr; } // Copy and/or encapsulate packet. std::unique_ptr encapsulated_packet = - encapsulate(*packet->packet_); + encapsulate(*packet.packet_); if (encapsulated_packet) { - packet->pending_transmission_ = true; + packet.pending_transmission_ = true; } return encapsulated_packet; @@ -233,18 +226,20 @@ void RtpPacketHistory::MarkPacketAsSent(uint16_t sequence_number) { return; } - StoredPacket* packet = GetStoredPacket(sequence_number); - if (packet == nullptr) { + int64_t now_ms = clock_->TimeInMilliseconds(); + StoredPacketIterator rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { return; } - RTC_DCHECK(packet->send_time_ms_); + StoredPacket& packet = rtp_it->second; + RTC_DCHECK(packet.send_time_ms_); // Update send-time, mark as no longer in pacer queue, and increment // transmission count. - packet->send_time_ms_ = clock_->TimeInMilliseconds(); - packet->pending_transmission_ = false; - packet->IncrementTimesRetransmitted(&padding_priority_); + packet.send_time_ms_ = now_ms; + packet.pending_transmission_ = false; + packet.IncrementTimesRetransmitted(&padding_priority_); } absl::optional RtpPacketHistory::GetPacketState( @@ -254,21 +249,16 @@ absl::optional RtpPacketHistory::GetPacketState( return absl::nullopt; } - int packet_index = GetPacketIndex(sequence_number); - if (packet_index < 0 || - static_cast(packet_index) >= packet_history_.size()) { - return absl::nullopt; - } - const StoredPacket& packet = packet_history_[packet_index]; - if (packet.packet_ == nullptr) { + auto rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { return absl::nullopt; } - if (!VerifyRtt(packet, clock_->TimeInMilliseconds())) { + if (!VerifyRtt(rtp_it->second, clock_->TimeInMilliseconds())) { return absl::nullopt; } - return StoredPacketToPacketState(packet); + return StoredPacketToPacketState(rtp_it->second); } bool RtpPacketHistory::VerifyRtt(const RtpPacketHistory::StoredPacket& packet, @@ -327,13 +317,15 @@ std::unique_ptr RtpPacketHistory::GetPayloadPaddingPacket( void RtpPacketHistory::CullAcknowledgedPackets( rtc::ArrayView sequence_numbers) { rtc::CritScope cs(&lock_); + if (mode_ == StorageMode::kDisabled) { + return; + } + for (uint16_t sequence_number : sequence_numbers) { - int packet_index = GetPacketIndex(sequence_number); - if (packet_index < 0 || - static_cast(packet_index) >= packet_history_.size()) { - continue; + auto stored_packet_it = packet_history_.find(sequence_number); + if (stored_packet_it != packet_history_.end()) { + RemovePacket(stored_packet_it); } - RemovePacket(packet_index); } } @@ -343,12 +335,12 @@ bool RtpPacketHistory::SetPendingTransmission(uint16_t sequence_number) { return false; } - StoredPacket* packet = GetStoredPacket(sequence_number); - if (packet == nullptr) { + auto rtp_it = packet_history_.find(sequence_number); + if (rtp_it == packet_history_.end()) { return false; } - packet->pending_transmission_ = true; + rtp_it->second.pending_transmission_ = true; return true; } @@ -360,21 +352,25 @@ void RtpPacketHistory::Clear() { void RtpPacketHistory::Reset() { packet_history_.clear(); padding_priority_.clear(); + start_seqno_.reset(); } void RtpPacketHistory::CullOldPackets(int64_t now_ms) { int64_t packet_duration_ms = std::max(kMinPacketDurationRtt * rtt_ms_, kMinPacketDurationMs); while (!packet_history_.empty()) { + auto stored_packet_it = packet_history_.find(*start_seqno_); + RTC_DCHECK(stored_packet_it != packet_history_.end()); + if (packet_history_.size() >= kMaxCapacity) { // We have reached the absolute max capacity, remove one packet // unconditionally. - RemovePacket(0); + RemovePacket(stored_packet_it); continue; } - const StoredPacket& stored_packet = packet_history_.front(); - if (stored_packet.pending_transmission_) { + const StoredPacket& stored_packet = stored_packet_it->second; + if (stored_packet_it->second.pending_transmission_) { // Don't remove packets in the pacer queue, pending tranmission. return; } @@ -390,7 +386,7 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { now_ms) { // Too many packets in history, or this packet has timed out. Remove it // and continue. - RemovePacket(0); + RemovePacket(stored_packet_it); } else { // No more packets can be removed right now. return; @@ -399,57 +395,46 @@ void RtpPacketHistory::CullOldPackets(int64_t now_ms) { } std::unique_ptr RtpPacketHistory::RemovePacket( - int packet_index) { + StoredPacketIterator packet_it) { // Move the packet out from the StoredPacket container. std::unique_ptr rtp_packet = - std::move(packet_history_[packet_index].packet_); + std::move(packet_it->second.packet_); + + // Check if this is the oldest packet in the history, as this must be updated + // in order to cull old packets. + const bool is_first_packet = packet_it->first == start_seqno_; // Erase from padding priority set, if eligible. - padding_priority_.erase(&packet_history_[packet_index]); + size_t num_erased = padding_priority_.erase(&packet_it->second); + RTC_DCHECK_EQ(num_erased, 1) + << "Failed to remove one packet from prio set, got " << num_erased; + if (num_erased != 1) { + RTC_LOG(LS_ERROR) << "RtpPacketHistory in inconsistent state, resetting."; + Reset(); + return nullptr; + } - if (packet_index == 0) { - while (!packet_history_.empty() && - packet_history_.front().packet_ == nullptr) { - packet_history_.pop_front(); + // Erase the packet from the map, and capture iterator to the next one. + StoredPacketIterator next_it = packet_history_.erase(packet_it); + + if (is_first_packet) { + // |next_it| now points to the next element, or to the end. If the end, + // check if we can wrap around. + if (next_it == packet_history_.end()) { + next_it = packet_history_.begin(); + } + + // Update |start_seq_no| to the new oldest item. + if (next_it != packet_history_.end()) { + start_seqno_ = next_it->first; + } else { + start_seqno_.reset(); } } return rtp_packet; } -int RtpPacketHistory::GetPacketIndex(uint16_t sequence_number) const { - if (packet_history_.empty()) { - return 0; - } - - RTC_DCHECK(packet_history_.front().packet_ != nullptr); - int first_seq = packet_history_.front().packet_->SequenceNumber(); - if (first_seq == sequence_number) { - return 0; - } - - if (IsNewerSequenceNumber(sequence_number, first_seq)) { - // New packet is ahead of start of list. Find the delta. - int packet_index = sequence_number - first_seq; - if (packet_index < 0) { - // A wrap-around has occurred, unwrap to get a valid index. - packet_index += 1 << 16; - } - return packet_index; - } - - return -1; -} - -RtpPacketHistory::StoredPacket* RtpPacketHistory::GetStoredPacket( - uint16_t sequence_number) { - int index = GetPacketIndex(sequence_number); - if (index < 0 || static_cast(index) >= packet_history_.size()) { - return nullptr; - } - return &packet_history_[index]; -} - RtpPacketHistory::PacketState RtpPacketHistory::StoredPacketToPacketState( const RtpPacketHistory::StoredPacket& stored_packet) { RtpPacketHistory::PacketState state; diff --git a/modules/rtp_rtcp/source/rtp_packet_history.h b/modules/rtp_rtcp/source/rtp_packet_history.h index 9253ede4fa..4850c7538c 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history.h +++ b/modules/rtp_rtcp/source/rtp_packet_history.h @@ -11,7 +11,6 @@ #ifndef MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_ #define MODULES_RTP_RTCP_SOURCE_RTP_PACKET_HISTORY_H_ -#include #include #include #include @@ -54,8 +53,6 @@ class RtpPacketHistory { // Maximum number of packets we ever allow in the history. static constexpr size_t kMaxCapacity = 9600; - // Maximum number of entries in prioritized queue of padding packets. - static constexpr size_t kMaxPaddingtHistory = 63; // Don't remove packets within max(1000ms, 3x RTT). static constexpr int64_t kMinPacketDurationMs = 1000; static constexpr int kMinPacketDurationRtt = 3; @@ -174,6 +171,8 @@ class RtpPacketHistory { bool operator()(StoredPacket* lhs, StoredPacket* rhs) const; }; + using StoredPacketIterator = std::map::iterator; + // Helper method used by GetPacketAndSetSendTime() and GetPacketState() to // check if packet has too recently been sent. bool VerifyRtt(const StoredPacket& packet, int64_t now_ms) const @@ -182,11 +181,7 @@ class RtpPacketHistory { void CullOldPackets(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); // Removes the packet from the history, and context/mapping that has been // stored. Returns the RTP packet instance contained within the StoredPacket. - std::unique_ptr RemovePacket(int packet_index) - RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); - int GetPacketIndex(uint16_t sequence_number) const - RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); - StoredPacket* GetStoredPacket(uint16_t sequence_number) + std::unique_ptr RemovePacket(StoredPacketIterator packet) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_); static PacketState StoredPacketToPacketState( const StoredPacket& stored_packet); @@ -197,13 +192,8 @@ class RtpPacketHistory { StorageMode mode_ RTC_GUARDED_BY(lock_); int64_t rtt_ms_ RTC_GUARDED_BY(lock_); - // Queue of stored packets, ordered by sequence number, with older packets in - // the front and new packets being added to the back. Note that there may be - // wrap-arounds so the back may have a lower sequence number. - // Packets may also be removed out-of-order, in which case there will be - // instances of StoredPacket with |packet_| set to nullptr. The first and last - // entry in the queue will however always be populated. - std::deque packet_history_ RTC_GUARDED_BY(lock_); + // Map from rtp sequence numbers to stored packet. + std::map packet_history_ RTC_GUARDED_BY(lock_); // Total number of packets with inserted. uint64_t packets_inserted_ RTC_GUARDED_BY(lock_); @@ -211,6 +201,10 @@ class RtpPacketHistory { // in GetPayloadPaddingPacket(). PacketPrioritySet padding_priority_ RTC_GUARDED_BY(lock_); + // The earliest packet in the history. This might not be the lowest sequence + // number, in case there is a wraparound. + absl::optional start_seqno_ RTC_GUARDED_BY(lock_); + RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtpPacketHistory); }; } // namespace webrtc diff --git a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc index 242af16ed8..0523ed2ba9 100644 --- a/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc +++ b/modules/rtp_rtcp/source/rtp_packet_history_unittest.cc @@ -291,38 +291,6 @@ TEST_F(RtpPacketHistoryTest, RemovesOldestPacketWhenAtMaxCapacity) { EXPECT_TRUE(hist_.GetPacketState(To16u(kStartSeqNum + 1))); } -TEST_F(RtpPacketHistoryTest, RemovesLowestPrioPaddingWhenAtMaxCapacity) { - // Tests the absolute upper bound on number of packets in the prioritized - // set of potential padding packets. - const size_t kMaxNumPackets = RtpPacketHistory::kMaxPaddingtHistory; - hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets * 2); - hist_.SetRtt(1); - - // Add packets until the max is reached, and then yet another one. - for (size_t i = 0; i < kMaxNumPackets + 1; ++i) { - std::unique_ptr packet = - CreateRtpPacket(To16u(kStartSeqNum + i)); - // Don't mark packets as sent, preventing them from being removed. - hist_.PutRtpPacket(std::move(packet), fake_clock_.TimeInMilliseconds()); - } - - // Advance time to allow retransmission/padding. - fake_clock_.AdvanceTimeMilliseconds(1); - - // The oldest packet will be least prioritized and has fallen out of the - // priority set. - for (size_t i = kMaxNumPackets - 1; i > 0; --i) { - auto packet = hist_.GetPayloadPaddingPacket(); - ASSERT_TRUE(packet); - EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + i + 1)); - } - - // Wrap around to newest padding packet again. - auto packet = hist_.GetPayloadPaddingPacket(); - ASSERT_TRUE(packet); - EXPECT_EQ(packet->SequenceNumber(), To16u(kStartSeqNum + kMaxNumPackets)); -} - TEST_F(RtpPacketHistoryTest, DontRemoveUnsentPackets) { const size_t kMaxNumPackets = 10; hist_.SetStorePacketsStatus(StorageMode::kStoreAndCull, kMaxNumPackets);