Improve PacketArrivalTimeMap perfomance

replace std::deque implementation with a manually controlled circular buffer.
replace Timestamp validity check from 'IsInfinite()' accesser to cheaper comparison to zero.
These greatly increase PacketArrivalTimeMap::AddPacket perfomance when packet arrive with large sequence number gaps.

Bug: chromium:1349880
Change-Id: I6f4e814b1086ca9d0b48608531e3a387d9e542dc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/270564
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37722}
This commit is contained in:
Danil Chapovalov
2022-08-08 12:28:45 +02:00
committed by WebRTC LUCI CQ
parent 39ae69690e
commit 7f0a7acb0a
4 changed files with 192 additions and 89 deletions

View File

@ -10,118 +10,184 @@
#include "modules/remote_bitrate_estimator/packet_arrival_map.h" #include "modules/remote_bitrate_estimator/packet_arrival_map.h"
#include <algorithm> #include <algorithm>
#include <cstdint>
#include "rtc_base/numerics/safe_minmax.h" #include "api/units/timestamp.h"
#include "rtc_base/checks.h"
namespace webrtc { namespace webrtc {
constexpr size_t PacketArrivalTimeMap::kMaxNumberOfPackets;
void PacketArrivalTimeMap::AddPacket(int64_t sequence_number, void PacketArrivalTimeMap::AddPacket(int64_t sequence_number,
Timestamp arrival_time) { Timestamp arrival_time) {
RTC_DCHECK_GE(arrival_time, Timestamp::Zero()); RTC_DCHECK_GE(arrival_time, Timestamp::Zero());
if (!has_seen_packet_) { if (!has_seen_packet()) {
// First packet. // First packet.
has_seen_packet_ = true; Reallocate(kMinCapacity);
begin_sequence_number_ = sequence_number; begin_sequence_number_ = sequence_number;
arrival_times_.push_back(arrival_time); end_sequence_number_ = sequence_number + 1;
arrival_times_[Index(sequence_number)] = arrival_time;
return; return;
} }
int64_t pos = sequence_number - begin_sequence_number_; if (sequence_number >= begin_sequence_number() &&
if (pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size())) { sequence_number < end_sequence_number()) {
// The packet is within the buffer - no need to expand it. // The packet is within the buffer - no need to expand it.
arrival_times_[pos] = arrival_time; arrival_times_[Index(sequence_number)] = arrival_time;
return; return;
} }
if (pos < 0) { if (sequence_number < begin_sequence_number()) {
// The packet goes before the current buffer. Expand to add packet, but only // The packet goes before the current buffer. Expand to add packet, but only
// if it fits within kMaxNumberOfPackets. // if it fits within kMaxNumberOfPackets.
size_t missing_packets = -pos; int64_t new_size = end_sequence_number() - sequence_number;
if (missing_packets + arrival_times_.size() > kMaxNumberOfPackets) { if (new_size > kMaxNumberOfPackets) {
// Don't expand the buffer further, as that would remove newly received // Don't expand the buffer further, as that would remove newly received
// packets. // packets.
return; return;
} }
AdjustToSize(new_size);
arrival_times_.insert(arrival_times_.begin(), missing_packets, arrival_times_[Index(sequence_number)] = arrival_time;
Timestamp::MinusInfinity()); SetNotReceived(sequence_number + 1, begin_sequence_number_);
arrival_times_[0] = arrival_time;
begin_sequence_number_ = sequence_number; begin_sequence_number_ = sequence_number;
return; return;
} }
// The packet goes after the buffer. // The packet goes after the buffer.
RTC_DCHECK_GE(sequence_number, end_sequence_number_);
int64_t new_end_sequence_number = sequence_number + 1;
if (static_cast<size_t>(pos) >= kMaxNumberOfPackets) { if (new_end_sequence_number >= end_sequence_number_ + kMaxNumberOfPackets) {
// The buffer grows too large - old packets have to be removed. // All old packets have to be removed.
size_t packets_to_remove = pos - kMaxNumberOfPackets + 1; begin_sequence_number_ = sequence_number;
if (packets_to_remove >= arrival_times_.size()) { end_sequence_number_ = new_end_sequence_number;
arrival_times_.clear(); arrival_times_[Index(sequence_number)] = arrival_time;
begin_sequence_number_ = sequence_number; return;
pos = 0;
} else {
// Also trim the buffer to remove leading non-received packets, to
// ensure that the buffer only spans received packets.
while (packets_to_remove < arrival_times_.size() &&
arrival_times_[packets_to_remove].IsInfinite()) {
++packets_to_remove;
}
arrival_times_.erase(arrival_times_.begin(),
arrival_times_.begin() + packets_to_remove);
begin_sequence_number_ += packets_to_remove;
pos -= packets_to_remove;
RTC_DCHECK_GE(pos, 0);
}
} }
if (begin_sequence_number_ < new_end_sequence_number - kMaxNumberOfPackets) {
// Remove oldest entries
begin_sequence_number_ = new_end_sequence_number - kMaxNumberOfPackets;
RTC_DCHECK_GT(end_sequence_number_, begin_sequence_number_);
// Also trim the buffer to remove leading non-received packets, to
// ensure that the buffer only spans received packets.
TrimLeadingNotReceivedEntries();
}
AdjustToSize(new_end_sequence_number - begin_sequence_number_);
// Packets can be received out-of-order. If this isn't the next expected // Packets can be received out-of-order. If this isn't the next expected
// packet, add enough placeholders to fill the gap. // packet, add enough placeholders to fill the gap.
size_t missing_gap_packets = pos - arrival_times_.size(); SetNotReceived(end_sequence_number_, sequence_number);
if (missing_gap_packets > 0) { end_sequence_number_ = new_end_sequence_number;
arrival_times_.insert(arrival_times_.end(), missing_gap_packets, arrival_times_[Index(sequence_number)] = arrival_time;
Timestamp::MinusInfinity()); }
void PacketArrivalTimeMap::TrimLeadingNotReceivedEntries() {
const int begin_index = Index(begin_sequence_number_);
const Timestamp* const begin_it = arrival_times_.get() + begin_index;
const Timestamp* const end_it = arrival_times_.get() + capacity();
for (const Timestamp* it = begin_it; it != end_it; ++it) {
if (*it >= Timestamp::Zero()) {
begin_sequence_number_ += (it - begin_it);
return;
}
}
// Reached end of the arrival_times_ and all entries represent not received
// packets. Remove them.
begin_sequence_number_ += (capacity() - begin_index);
// Continue removing entries at the beginning of the circular buffer.
for (const Timestamp* it = arrival_times_.get(); it != begin_it; ++it) {
if (*it >= Timestamp::Zero()) {
begin_sequence_number_ += (it - arrival_times_.get());
return;
}
}
RTC_DCHECK_NOTREACHED() << "There should be at least one non-empty entry";
}
void PacketArrivalTimeMap::SetNotReceived(
int64_t begin_sequence_number_inclusive,
int64_t end_sequence_number_exclusive) {
static constexpr Timestamp value = Timestamp::MinusInfinity();
int begin_index = Index(begin_sequence_number_inclusive);
int end_index = Index(end_sequence_number_exclusive);
if (begin_index <= end_index) {
// Entries to clear are in single block:
// [......{-----}....]
std::fill(arrival_times_.get() + begin_index,
arrival_times_.get() + end_index, value);
} else {
// Entries to clear span across arrival_times_ border:
// [--}..........{---]
std::fill(arrival_times_.get() + begin_index,
arrival_times_.get() + capacity(), value);
std::fill(arrival_times_.get(), arrival_times_.get() + end_index, value);
} }
RTC_DCHECK_EQ(arrival_times_.size(), pos);
arrival_times_.push_back(arrival_time);
RTC_DCHECK_LE(arrival_times_.size(), kMaxNumberOfPackets);
} }
void PacketArrivalTimeMap::RemoveOldPackets(int64_t sequence_number, void PacketArrivalTimeMap::RemoveOldPackets(int64_t sequence_number,
Timestamp arrival_time_limit) { Timestamp arrival_time_limit) {
while (!arrival_times_.empty() && begin_sequence_number_ < sequence_number && int64_t check_to = std::min(sequence_number, end_sequence_number_);
arrival_times_.front() <= arrival_time_limit) { while (begin_sequence_number_ < check_to &&
arrival_times_.pop_front(); arrival_times_[Index(begin_sequence_number_)] <= arrival_time_limit) {
++begin_sequence_number_; ++begin_sequence_number_;
} }
} AdjustToSize(end_sequence_number_ - begin_sequence_number_);
bool PacketArrivalTimeMap::has_received(int64_t sequence_number) const {
int64_t pos = sequence_number - begin_sequence_number_;
if (pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size()) &&
arrival_times_[pos].IsFinite()) {
return true;
}
return false;
} }
void PacketArrivalTimeMap::EraseTo(int64_t sequence_number) { void PacketArrivalTimeMap::EraseTo(int64_t sequence_number) {
if (sequence_number > begin_sequence_number_) { if (sequence_number < begin_sequence_number_) {
size_t count = return;
std::min(static_cast<size_t>(sequence_number - begin_sequence_number_),
arrival_times_.size());
arrival_times_.erase(arrival_times_.begin(),
arrival_times_.begin() + count);
begin_sequence_number_ += count;
} }
if (sequence_number >= end_sequence_number_) {
// Erase all.
begin_sequence_number_ = end_sequence_number_;
return;
}
// Remove some.
begin_sequence_number_ = sequence_number;
RTC_DCHECK(has_received(begin_sequence_number_));
AdjustToSize(end_sequence_number_ - begin_sequence_number_);
} }
int64_t PacketArrivalTimeMap::clamp(int64_t sequence_number) const { void PacketArrivalTimeMap::AdjustToSize(int new_size) {
return rtc::SafeClamp(sequence_number, begin_sequence_number(), if (new_size > capacity()) {
end_sequence_number()); int new_capacity = capacity();
while (new_capacity < new_size)
new_capacity *= 2;
Reallocate(new_capacity);
}
if (capacity() > std::max(kMinCapacity, 4 * new_size)) {
int new_capacity = capacity();
while (new_capacity > 2 * std::max(new_size, kMinCapacity)) {
new_capacity /= 2;
}
Reallocate(new_capacity);
}
RTC_DCHECK_LE(new_size, capacity());
}
void PacketArrivalTimeMap::Reallocate(int new_capacity) {
int new_capacity_minus_1 = new_capacity - 1;
// Check capacity is a power of 2.
RTC_DCHECK_EQ(new_capacity & new_capacity_minus_1, 0);
// Create uninitialized memory.
// All valid entries should be set by `AddPacket` before use.
void* raw = operator new[](new_capacity * sizeof(Timestamp));
Timestamp* new_buffer = static_cast<Timestamp*>(raw);
for (int64_t sequence_number = begin_sequence_number_;
sequence_number < end_sequence_number_; ++sequence_number) {
new_buffer[sequence_number & new_capacity_minus_1] =
arrival_times_[sequence_number & capacity_minus_1_];
}
arrival_times_.reset(new_buffer);
capacity_minus_1_ = new_capacity_minus_1;
} }
} // namespace webrtc } // namespace webrtc

View File

@ -10,9 +10,10 @@
#ifndef MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_ #ifndef MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_
#define MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_ #define MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_
#include <algorithm>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <deque> #include <memory>
#include "api/units/timestamp.h" #include "api/units/timestamp.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
@ -32,10 +33,19 @@ class PacketArrivalTimeMap {
public: public:
// Impossible to request feedback older than what can be represented by 15 // Impossible to request feedback older than what can be represented by 15
// bits. // bits.
static constexpr size_t kMaxNumberOfPackets = (1 << 15); static constexpr int kMaxNumberOfPackets = (1 << 15);
PacketArrivalTimeMap() = default;
PacketArrivalTimeMap(const PacketArrivalTimeMap&) = delete;
PacketArrivalTimeMap& operator=(const PacketArrivalTimeMap&) = delete;
~PacketArrivalTimeMap() = default;
// Indicates if the packet with `sequence_number` has already been received. // Indicates if the packet with `sequence_number` has already been received.
bool has_received(int64_t sequence_number) const; bool has_received(int64_t sequence_number) const {
return sequence_number >= begin_sequence_number() &&
sequence_number < end_sequence_number() &&
arrival_times_[Index(sequence_number)] >= Timestamp::Zero();
}
// Returns the sequence number of the first entry in the map, i.e. the // Returns the sequence number of the first entry in the map, i.e. the
// sequence number that a `begin()` iterator would represent. // sequence number that a `begin()` iterator would represent.
@ -43,21 +53,22 @@ class PacketArrivalTimeMap {
// Returns the sequence number of the element just after the map, i.e. the // Returns the sequence number of the element just after the map, i.e. the
// sequence number that an `end()` iterator would represent. // sequence number that an `end()` iterator would represent.
int64_t end_sequence_number() const { int64_t end_sequence_number() const { return end_sequence_number_; }
return begin_sequence_number_ + arrival_times_.size();
}
// Returns an element by `sequence_number`, which must be valid, i.e. // Returns an element by `sequence_number`, which must be valid, i.e.
// between [begin_sequence_number, end_sequence_number). // between [begin_sequence_number, end_sequence_number).
Timestamp get(int64_t sequence_number) { Timestamp get(int64_t sequence_number) {
int64_t pos = sequence_number - begin_sequence_number_; RTC_DCHECK_GE(sequence_number, begin_sequence_number());
RTC_DCHECK(pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size())); RTC_DCHECK_LT(sequence_number, end_sequence_number());
return arrival_times_[pos]; return arrival_times_[Index(sequence_number)];
} }
// Clamps `sequence_number` between [begin_sequence_number, // Clamps `sequence_number` between [begin_sequence_number,
// end_sequence_number]. // end_sequence_number].
int64_t clamp(int64_t sequence_number) const; int64_t clamp(int64_t sequence_number) const {
return std::clamp(sequence_number, begin_sequence_number(),
end_sequence_number());
}
// Erases all elements from the beginning of the map until `sequence_number`. // Erases all elements from the beginning of the map until `sequence_number`.
void EraseTo(int64_t sequence_number); void EraseTo(int64_t sequence_number);
@ -71,17 +82,44 @@ class PacketArrivalTimeMap {
void RemoveOldPackets(int64_t sequence_number, Timestamp arrival_time_limit); void RemoveOldPackets(int64_t sequence_number, Timestamp arrival_time_limit);
private: private:
// Deque representing unwrapped sequence number -> time, where the index + static constexpr int kMinCapacity = 128;
// `begin_sequence_number_` represents the packet's sequence number.
std::deque<Timestamp> arrival_times_;
// The unwrapped sequence number for the first element in // Returns index in the `arrival_times_` for value for `sequence_number`.
// `arrival_times_`. int Index(int64_t sequence_number) const {
// Note that sequence_number might be negative, thus taking '%' requires
// extra handling and can be slow. Because capacity is a power of two, it
// is much faster to use '&' operator.
return sequence_number & capacity_minus_1_;
}
void SetNotReceived(int64_t begin_sequence_number_inclusive,
int64_t end_sequence_number_exclusive);
void TrimLeadingNotReceivedEntries();
// Adjust capacity to match new_size, may reduce capacity.
// On return guarantees capacity >= new_size.
void AdjustToSize(int new_size);
void Reallocate(int new_capacity);
int capacity() const { return capacity_minus_1_ + 1; }
bool has_seen_packet() const { return arrival_times_ != nullptr; }
// Circular buffer. Packet with sequence number `sequence_number`
// is stored in the slot `sequence_number % capacity_`
std::unique_ptr<Timestamp[]> arrival_times_ = nullptr;
// Allocated size of the `arrival_times_`
// capacity_ is a power of 2 in range [kMinCapacity, kMaxNumberOfPackets]
// `capacity - 1` is used much more often than `capacity`, thus that value is
// stored.
int capacity_minus_1_ = -1;
// The unwrapped sequence number for valid range of sequence numbers.
// arrival_times_ entries only valid for sequence numbers in range
// `begin_sequence_number_ <= sequence_number < end_sequence_number_`
int64_t begin_sequence_number_ = 0; int64_t begin_sequence_number_ = 0;
int64_t end_sequence_number_ = 0;
// Indicates if this map has had any packet added to it. The first packet
// decides the initial sequence number.
bool has_seen_packet_ = false;
}; };
} // namespace webrtc } // namespace webrtc

View File

@ -102,8 +102,7 @@ TEST(PacketArrivalMapTest, GrowsBufferAndRemoveOld) {
EXPECT_EQ(map.begin_sequence_number(), 43); EXPECT_EQ(map.begin_sequence_number(), 43);
EXPECT_EQ(map.end_sequence_number(), kLargeSeq + 1); EXPECT_EQ(map.end_sequence_number(), kLargeSeq + 1);
EXPECT_EQ(static_cast<size_t>(map.end_sequence_number() - EXPECT_EQ(map.end_sequence_number() - map.begin_sequence_number(),
map.begin_sequence_number()),
PacketArrivalTimeMap::kMaxNumberOfPackets); PacketArrivalTimeMap::kMaxNumberOfPackets);
EXPECT_FALSE(map.has_received(41)); EXPECT_FALSE(map.has_received(41));

View File

@ -82,7 +82,7 @@ void RemoteEstimatorProxy::MaybeCullOldPackets(int64_t sequence_number,
void RemoteEstimatorProxy::IncomingPacket(int64_t arrival_time_ms, void RemoteEstimatorProxy::IncomingPacket(int64_t arrival_time_ms,
size_t payload_size, size_t payload_size,
const RTPHeader& header) { const RTPHeader& header) {
if (arrival_time_ms < 0 || arrival_time_ms > kMaxTimeMs) { if (arrival_time_ms < 0 || arrival_time_ms >= kMaxTimeMs) {
RTC_LOG(LS_WARNING) << "Arrival time out of bounds: " << arrival_time_ms; RTC_LOG(LS_WARNING) << "Arrival time out of bounds: " << arrival_time_ms;
return; return;
} }
@ -292,7 +292,7 @@ RemoteEstimatorProxy::MaybeBuildFeedbackPacket(
for (int64_t seq = start_seq; seq < end_seq; ++seq) { for (int64_t seq = start_seq; seq < end_seq; ++seq) {
Timestamp arrival_time = packet_arrival_times_.get(seq); Timestamp arrival_time = packet_arrival_times_.get(seq);
if (arrival_time.IsInfinite()) { if (arrival_time < Timestamp::Zero()) {
// Packet not received. // Packet not received.
continue; continue;
} }