diff --git a/modules/remote_bitrate_estimator/packet_arrival_map.cc b/modules/remote_bitrate_estimator/packet_arrival_map.cc index 09c9e5aed1..16d400e227 100644 --- a/modules/remote_bitrate_estimator/packet_arrival_map.cc +++ b/modules/remote_bitrate_estimator/packet_arrival_map.cc @@ -10,118 +10,184 @@ #include "modules/remote_bitrate_estimator/packet_arrival_map.h" #include +#include -#include "rtc_base/numerics/safe_minmax.h" +#include "api/units/timestamp.h" +#include "rtc_base/checks.h" namespace webrtc { -constexpr size_t PacketArrivalTimeMap::kMaxNumberOfPackets; - void PacketArrivalTimeMap::AddPacket(int64_t sequence_number, Timestamp arrival_time) { RTC_DCHECK_GE(arrival_time, Timestamp::Zero()); - if (!has_seen_packet_) { + if (!has_seen_packet()) { // First packet. - has_seen_packet_ = true; + Reallocate(kMinCapacity); begin_sequence_number_ = sequence_number; - arrival_times_.push_back(arrival_time); + end_sequence_number_ = sequence_number + 1; + arrival_times_[Index(sequence_number)] = arrival_time; return; } - int64_t pos = sequence_number - begin_sequence_number_; - if (pos >= 0 && pos < static_cast(arrival_times_.size())) { + if (sequence_number >= begin_sequence_number() && + sequence_number < end_sequence_number()) { // The packet is within the buffer - no need to expand it. - arrival_times_[pos] = arrival_time; + arrival_times_[Index(sequence_number)] = arrival_time; return; } - if (pos < 0) { + if (sequence_number < begin_sequence_number()) { // The packet goes before the current buffer. Expand to add packet, but only // if it fits within kMaxNumberOfPackets. - size_t missing_packets = -pos; - if (missing_packets + arrival_times_.size() > kMaxNumberOfPackets) { + int64_t new_size = end_sequence_number() - sequence_number; + if (new_size > kMaxNumberOfPackets) { // Don't expand the buffer further, as that would remove newly received // packets. return; } + AdjustToSize(new_size); - arrival_times_.insert(arrival_times_.begin(), missing_packets, - Timestamp::MinusInfinity()); - arrival_times_[0] = arrival_time; + arrival_times_[Index(sequence_number)] = arrival_time; + SetNotReceived(sequence_number + 1, begin_sequence_number_); begin_sequence_number_ = sequence_number; return; } // The packet goes after the buffer. + RTC_DCHECK_GE(sequence_number, end_sequence_number_); + int64_t new_end_sequence_number = sequence_number + 1; - if (static_cast(pos) >= kMaxNumberOfPackets) { - // The buffer grows too large - old packets have to be removed. - size_t packets_to_remove = pos - kMaxNumberOfPackets + 1; - if (packets_to_remove >= arrival_times_.size()) { - arrival_times_.clear(); - begin_sequence_number_ = sequence_number; - pos = 0; - } else { - // Also trim the buffer to remove leading non-received packets, to - // ensure that the buffer only spans received packets. - while (packets_to_remove < arrival_times_.size() && - arrival_times_[packets_to_remove].IsInfinite()) { - ++packets_to_remove; - } - - arrival_times_.erase(arrival_times_.begin(), - arrival_times_.begin() + packets_to_remove); - begin_sequence_number_ += packets_to_remove; - pos -= packets_to_remove; - RTC_DCHECK_GE(pos, 0); - } + if (new_end_sequence_number >= end_sequence_number_ + kMaxNumberOfPackets) { + // All old packets have to be removed. + begin_sequence_number_ = sequence_number; + end_sequence_number_ = new_end_sequence_number; + arrival_times_[Index(sequence_number)] = arrival_time; + return; } + if (begin_sequence_number_ < new_end_sequence_number - kMaxNumberOfPackets) { + // Remove oldest entries + begin_sequence_number_ = new_end_sequence_number - kMaxNumberOfPackets; + RTC_DCHECK_GT(end_sequence_number_, begin_sequence_number_); + // Also trim the buffer to remove leading non-received packets, to + // ensure that the buffer only spans received packets. + TrimLeadingNotReceivedEntries(); + } + + AdjustToSize(new_end_sequence_number - begin_sequence_number_); + // Packets can be received out-of-order. If this isn't the next expected // packet, add enough placeholders to fill the gap. - size_t missing_gap_packets = pos - arrival_times_.size(); - if (missing_gap_packets > 0) { - arrival_times_.insert(arrival_times_.end(), missing_gap_packets, - Timestamp::MinusInfinity()); + SetNotReceived(end_sequence_number_, sequence_number); + end_sequence_number_ = new_end_sequence_number; + arrival_times_[Index(sequence_number)] = arrival_time; +} + +void PacketArrivalTimeMap::TrimLeadingNotReceivedEntries() { + const int begin_index = Index(begin_sequence_number_); + const Timestamp* const begin_it = arrival_times_.get() + begin_index; + const Timestamp* const end_it = arrival_times_.get() + capacity(); + + for (const Timestamp* it = begin_it; it != end_it; ++it) { + if (*it >= Timestamp::Zero()) { + begin_sequence_number_ += (it - begin_it); + return; + } + } + // Reached end of the arrival_times_ and all entries represent not received + // packets. Remove them. + begin_sequence_number_ += (capacity() - begin_index); + // Continue removing entries at the beginning of the circular buffer. + for (const Timestamp* it = arrival_times_.get(); it != begin_it; ++it) { + if (*it >= Timestamp::Zero()) { + begin_sequence_number_ += (it - arrival_times_.get()); + return; + } + } + + RTC_DCHECK_NOTREACHED() << "There should be at least one non-empty entry"; +} + +void PacketArrivalTimeMap::SetNotReceived( + int64_t begin_sequence_number_inclusive, + int64_t end_sequence_number_exclusive) { + static constexpr Timestamp value = Timestamp::MinusInfinity(); + + int begin_index = Index(begin_sequence_number_inclusive); + int end_index = Index(end_sequence_number_exclusive); + + if (begin_index <= end_index) { + // Entries to clear are in single block: + // [......{-----}....] + std::fill(arrival_times_.get() + begin_index, + arrival_times_.get() + end_index, value); + } else { + // Entries to clear span across arrival_times_ border: + // [--}..........{---] + std::fill(arrival_times_.get() + begin_index, + arrival_times_.get() + capacity(), value); + std::fill(arrival_times_.get(), arrival_times_.get() + end_index, value); } - RTC_DCHECK_EQ(arrival_times_.size(), pos); - arrival_times_.push_back(arrival_time); - RTC_DCHECK_LE(arrival_times_.size(), kMaxNumberOfPackets); } void PacketArrivalTimeMap::RemoveOldPackets(int64_t sequence_number, Timestamp arrival_time_limit) { - while (!arrival_times_.empty() && begin_sequence_number_ < sequence_number && - arrival_times_.front() <= arrival_time_limit) { - arrival_times_.pop_front(); + int64_t check_to = std::min(sequence_number, end_sequence_number_); + while (begin_sequence_number_ < check_to && + arrival_times_[Index(begin_sequence_number_)] <= arrival_time_limit) { ++begin_sequence_number_; } -} - -bool PacketArrivalTimeMap::has_received(int64_t sequence_number) const { - int64_t pos = sequence_number - begin_sequence_number_; - if (pos >= 0 && pos < static_cast(arrival_times_.size()) && - arrival_times_[pos].IsFinite()) { - return true; - } - return false; + AdjustToSize(end_sequence_number_ - begin_sequence_number_); } void PacketArrivalTimeMap::EraseTo(int64_t sequence_number) { - if (sequence_number > begin_sequence_number_) { - size_t count = - std::min(static_cast(sequence_number - begin_sequence_number_), - arrival_times_.size()); - - arrival_times_.erase(arrival_times_.begin(), - arrival_times_.begin() + count); - begin_sequence_number_ += count; + if (sequence_number < begin_sequence_number_) { + return; } + if (sequence_number >= end_sequence_number_) { + // Erase all. + begin_sequence_number_ = end_sequence_number_; + return; + } + // Remove some. + begin_sequence_number_ = sequence_number; + RTC_DCHECK(has_received(begin_sequence_number_)); + AdjustToSize(end_sequence_number_ - begin_sequence_number_); } -int64_t PacketArrivalTimeMap::clamp(int64_t sequence_number) const { - return rtc::SafeClamp(sequence_number, begin_sequence_number(), - end_sequence_number()); +void PacketArrivalTimeMap::AdjustToSize(int new_size) { + if (new_size > capacity()) { + int new_capacity = capacity(); + while (new_capacity < new_size) + new_capacity *= 2; + Reallocate(new_capacity); + } + if (capacity() > std::max(kMinCapacity, 4 * new_size)) { + int new_capacity = capacity(); + while (new_capacity > 2 * std::max(new_size, kMinCapacity)) { + new_capacity /= 2; + } + Reallocate(new_capacity); + } + RTC_DCHECK_LE(new_size, capacity()); +} + +void PacketArrivalTimeMap::Reallocate(int new_capacity) { + int new_capacity_minus_1 = new_capacity - 1; + // Check capacity is a power of 2. + RTC_DCHECK_EQ(new_capacity & new_capacity_minus_1, 0); + // Create uninitialized memory. + // All valid entries should be set by `AddPacket` before use. + void* raw = operator new[](new_capacity * sizeof(Timestamp)); + Timestamp* new_buffer = static_cast(raw); + + for (int64_t sequence_number = begin_sequence_number_; + sequence_number < end_sequence_number_; ++sequence_number) { + new_buffer[sequence_number & new_capacity_minus_1] = + arrival_times_[sequence_number & capacity_minus_1_]; + } + arrival_times_.reset(new_buffer); + capacity_minus_1_ = new_capacity_minus_1; } } // namespace webrtc diff --git a/modules/remote_bitrate_estimator/packet_arrival_map.h b/modules/remote_bitrate_estimator/packet_arrival_map.h index 8bda4a86e1..d489a0c53d 100644 --- a/modules/remote_bitrate_estimator/packet_arrival_map.h +++ b/modules/remote_bitrate_estimator/packet_arrival_map.h @@ -10,9 +10,10 @@ #ifndef MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_ #define MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_ +#include #include #include -#include +#include #include "api/units/timestamp.h" #include "rtc_base/checks.h" @@ -32,10 +33,19 @@ class PacketArrivalTimeMap { public: // Impossible to request feedback older than what can be represented by 15 // bits. - static constexpr size_t kMaxNumberOfPackets = (1 << 15); + static constexpr int kMaxNumberOfPackets = (1 << 15); + + PacketArrivalTimeMap() = default; + PacketArrivalTimeMap(const PacketArrivalTimeMap&) = delete; + PacketArrivalTimeMap& operator=(const PacketArrivalTimeMap&) = delete; + ~PacketArrivalTimeMap() = default; // Indicates if the packet with `sequence_number` has already been received. - bool has_received(int64_t sequence_number) const; + bool has_received(int64_t sequence_number) const { + return sequence_number >= begin_sequence_number() && + sequence_number < end_sequence_number() && + arrival_times_[Index(sequence_number)] >= Timestamp::Zero(); + } // Returns the sequence number of the first entry in the map, i.e. the // sequence number that a `begin()` iterator would represent. @@ -43,21 +53,22 @@ class PacketArrivalTimeMap { // Returns the sequence number of the element just after the map, i.e. the // sequence number that an `end()` iterator would represent. - int64_t end_sequence_number() const { - return begin_sequence_number_ + arrival_times_.size(); - } + int64_t end_sequence_number() const { return end_sequence_number_; } // Returns an element by `sequence_number`, which must be valid, i.e. // between [begin_sequence_number, end_sequence_number). Timestamp get(int64_t sequence_number) { - int64_t pos = sequence_number - begin_sequence_number_; - RTC_DCHECK(pos >= 0 && pos < static_cast(arrival_times_.size())); - return arrival_times_[pos]; + RTC_DCHECK_GE(sequence_number, begin_sequence_number()); + RTC_DCHECK_LT(sequence_number, end_sequence_number()); + return arrival_times_[Index(sequence_number)]; } // Clamps `sequence_number` between [begin_sequence_number, // end_sequence_number]. - int64_t clamp(int64_t sequence_number) const; + int64_t clamp(int64_t sequence_number) const { + return std::clamp(sequence_number, begin_sequence_number(), + end_sequence_number()); + } // Erases all elements from the beginning of the map until `sequence_number`. void EraseTo(int64_t sequence_number); @@ -71,17 +82,44 @@ class PacketArrivalTimeMap { void RemoveOldPackets(int64_t sequence_number, Timestamp arrival_time_limit); private: - // Deque representing unwrapped sequence number -> time, where the index + - // `begin_sequence_number_` represents the packet's sequence number. - std::deque arrival_times_; + static constexpr int kMinCapacity = 128; - // The unwrapped sequence number for the first element in - // `arrival_times_`. + // Returns index in the `arrival_times_` for value for `sequence_number`. + int Index(int64_t sequence_number) const { + // Note that sequence_number might be negative, thus taking '%' requires + // extra handling and can be slow. Because capacity is a power of two, it + // is much faster to use '&' operator. + return sequence_number & capacity_minus_1_; + } + + void SetNotReceived(int64_t begin_sequence_number_inclusive, + int64_t end_sequence_number_exclusive); + + void TrimLeadingNotReceivedEntries(); + + // Adjust capacity to match new_size, may reduce capacity. + // On return guarantees capacity >= new_size. + void AdjustToSize(int new_size); + void Reallocate(int new_capacity); + + int capacity() const { return capacity_minus_1_ + 1; } + bool has_seen_packet() const { return arrival_times_ != nullptr; } + + // Circular buffer. Packet with sequence number `sequence_number` + // is stored in the slot `sequence_number % capacity_` + std::unique_ptr arrival_times_ = nullptr; + + // Allocated size of the `arrival_times_` + // capacity_ is a power of 2 in range [kMinCapacity, kMaxNumberOfPackets] + // `capacity - 1` is used much more often than `capacity`, thus that value is + // stored. + int capacity_minus_1_ = -1; + + // The unwrapped sequence number for valid range of sequence numbers. + // arrival_times_ entries only valid for sequence numbers in range + // `begin_sequence_number_ <= sequence_number < end_sequence_number_` int64_t begin_sequence_number_ = 0; - - // Indicates if this map has had any packet added to it. The first packet - // decides the initial sequence number. - bool has_seen_packet_ = false; + int64_t end_sequence_number_ = 0; }; } // namespace webrtc diff --git a/modules/remote_bitrate_estimator/packet_arrival_map_test.cc b/modules/remote_bitrate_estimator/packet_arrival_map_test.cc index c083daa300..00c927ffd7 100644 --- a/modules/remote_bitrate_estimator/packet_arrival_map_test.cc +++ b/modules/remote_bitrate_estimator/packet_arrival_map_test.cc @@ -102,8 +102,7 @@ TEST(PacketArrivalMapTest, GrowsBufferAndRemoveOld) { EXPECT_EQ(map.begin_sequence_number(), 43); EXPECT_EQ(map.end_sequence_number(), kLargeSeq + 1); - EXPECT_EQ(static_cast(map.end_sequence_number() - - map.begin_sequence_number()), + EXPECT_EQ(map.end_sequence_number() - map.begin_sequence_number(), PacketArrivalTimeMap::kMaxNumberOfPackets); EXPECT_FALSE(map.has_received(41)); diff --git a/modules/remote_bitrate_estimator/remote_estimator_proxy.cc b/modules/remote_bitrate_estimator/remote_estimator_proxy.cc index 1bb2f5532b..b83720d1a8 100644 --- a/modules/remote_bitrate_estimator/remote_estimator_proxy.cc +++ b/modules/remote_bitrate_estimator/remote_estimator_proxy.cc @@ -82,7 +82,7 @@ void RemoteEstimatorProxy::MaybeCullOldPackets(int64_t sequence_number, void RemoteEstimatorProxy::IncomingPacket(int64_t arrival_time_ms, size_t payload_size, const RTPHeader& header) { - if (arrival_time_ms < 0 || arrival_time_ms > kMaxTimeMs) { + if (arrival_time_ms < 0 || arrival_time_ms >= kMaxTimeMs) { RTC_LOG(LS_WARNING) << "Arrival time out of bounds: " << arrival_time_ms; return; } @@ -292,7 +292,7 @@ RemoteEstimatorProxy::MaybeBuildFeedbackPacket( for (int64_t seq = start_seq; seq < end_seq; ++seq) { Timestamp arrival_time = packet_arrival_times_.get(seq); - if (arrival_time.IsInfinite()) { + if (arrival_time < Timestamp::Zero()) { // Packet not received. continue; }