Cleanup of TransportFeedbackAdapter.

* Removes legacy defines from rtp_rtcp_defines.
* Simplifies the feedback adaptation logic, this is achieved
  by using the ability to preserve lost packets information
  from the RTCP message.
* Extracts in flight data tracking to a separate helper class.
* Removes legacy fields and constructors from the PacketFeedback
  structure.
* Removes the legacy GetTransportFeedbackVector method.

Apart from reducing total LOC, this prepares for moving the adaptation
to run on a TaskQueue.

Bug: webrtc:9883
Change-Id: I5ef4eace0948f119f283cd71dc2b8d0954a1449b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158781
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29674}
This commit is contained in:
Sebastian Jansson
2019-10-31 19:08:33 +01:00
committed by Commit Bot
parent c3d1f9b0cd
commit 26452ff7db
12 changed files with 309 additions and 474 deletions

View File

@ -128,11 +128,6 @@ class RtpTransportControllerSendInterface {
virtual void SetQueueTimeLimit(int limit_ms) = 0;
virtual StreamFeedbackProvider* GetStreamFeedbackProvider() = 0;
// DEPRECATED, use GetStreamFeedbackProvider instead.
virtual void RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {}
virtual void DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {}
virtual void RegisterTargetTransferRateObserver(
TargetTransferRateObserver* observer) = 0;
virtual void OnNetworkRouteChanged(

View File

@ -68,8 +68,6 @@ if (rtc_include_tests) {
testonly = true
sources = [
"congestion_controller_unittests_helper.cc",
"congestion_controller_unittests_helper.h",
"transport_feedback_adapter_unittest.cc",
]
deps = [

View File

@ -1,45 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/congestion_controller/rtp/congestion_controller_unittests_helper.h"
#include <stddef.h>
#include <stdint.h>
#include "rtc_base/checks.h"
#include "test/gtest.h"
namespace webrtc {
void ComparePacketFeedbackVectors(const std::vector<PacketFeedback>& truth,
const std::vector<PacketFeedback>& input) {
ASSERT_EQ(truth.size(), input.size());
size_t len = truth.size();
// truth contains the input data for the test, and input is what will be
// sent to the bandwidth estimator. truth.arrival_tims_ms is used to
// populate the transport feedback messages. As these times may be changed
// (because of resolution limits in the packets, and because of the time
// base adjustment performed by the TransportFeedbackAdapter at the first
// packet, the truth[x].arrival_time and input[x].arrival_time may not be
// equal. However, the difference must be the same for all x.
int64_t arrival_time_delta =
truth[0].arrival_time_ms - input[0].arrival_time_ms;
for (size_t i = 0; i < len; ++i) {
RTC_CHECK(truth[i].arrival_time_ms != PacketFeedback::kNotReceived);
if (input[i].arrival_time_ms != PacketFeedback::kNotReceived) {
EXPECT_EQ(truth[i].arrival_time_ms,
input[i].arrival_time_ms + arrival_time_delta);
}
EXPECT_EQ(truth[i].send_time_ms, input[i].send_time_ms);
EXPECT_EQ(truth[i].sequence_number, input[i].sequence_number);
EXPECT_EQ(truth[i].payload_size, input[i].payload_size);
EXPECT_EQ(truth[i].pacing_info, input[i].pacing_info);
}
}
} // namespace webrtc

View File

@ -1,23 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_CONGESTION_CONTROLLER_RTP_CONGESTION_CONTROLLER_UNITTESTS_HELPER_H_
#define MODULES_CONGESTION_CONTROLLER_RTP_CONGESTION_CONTROLLER_UNITTESTS_HELPER_H_
#include <vector>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
void ComparePacketFeedbackVectors(const std::vector<PacketFeedback>& truth,
const std::vector<PacketFeedback>& input);
} // namespace webrtc
#endif // MODULES_CONGESTION_CONTROLLER_RTP_CONGESTION_CONTROLLER_UNITTESTS_HELPER_H_

View File

@ -35,7 +35,7 @@ PacketResult NetworkPacketFeedbackFromRtpPacketFeedback(
} else {
feedback.receive_time = Timestamp::ms(pf.arrival_time_ms);
}
feedback.sent_packet.sequence_number = pf.long_sequence_number;
feedback.sent_packet.sequence_number = pf.sequence_number;
feedback.sent_packet.send_time = Timestamp::ms(pf.send_time_ms);
feedback.sent_packet.size = DataSize::bytes(pf.payload_size);
feedback.sent_packet.pacing_info = pf.pacing_info;
@ -47,6 +47,41 @@ PacketResult NetworkPacketFeedbackFromRtpPacketFeedback(
const int64_t kNoTimestamp = -1;
const int64_t kSendTimeHistoryWindowMs = 60000;
void InFlightBytesTracker::AddInFlightPacketBytes(
const PacketFeedback& packet) {
RTC_DCHECK_NE(packet.send_time_ms, -1);
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second += packet.payload_size;
} else {
in_flight_bytes_[{packet.local_net_id, packet.remote_net_id}] =
packet.payload_size;
}
}
void InFlightBytesTracker::RemoveInFlightPacketBytes(
const PacketFeedback& packet) {
if (packet.send_time_ms < 0)
return;
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second -= packet.payload_size;
if (it->second == 0)
in_flight_bytes_.erase(it);
}
}
DataSize InFlightBytesTracker::GetOutstandingData(
uint16_t local_net_id,
uint16_t remote_net_id) const {
auto it = in_flight_bytes_.find({local_net_id, remote_net_id});
if (it != in_flight_bytes_.end()) {
return DataSize::bytes(it->second);
} else {
return DataSize::Zero();
}
}
TransportFeedbackAdapter::TransportFeedbackAdapter()
: packet_age_limit_ms_(kSendTimeHistoryWindowMs),
current_offset_ms_(kNoTimestamp),
@ -84,25 +119,28 @@ void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
Timestamp creation_time) {
{
rtc::CritScope cs(&lock_);
PacketFeedback packet(creation_time.ms(),
packet_info.transport_sequence_number,
packet_info.length + overhead_bytes, local_net_id_,
remote_net_id_, packet_info.pacing_info);
PacketFeedback packet;
packet.creation_time_ms = creation_time.ms();
packet.sequence_number =
seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number);
packet.payload_size = packet_info.length + overhead_bytes;
packet.local_net_id = local_net_id_;
packet.remote_net_id = remote_net_id_;
packet.pacing_info = packet_info.pacing_info;
if (packet_info.has_rtp_sequence_number) {
packet.ssrc = packet_info.ssrc;
packet.rtp_sequence_number = packet_info.rtp_sequence_number;
}
packet.long_sequence_number =
seq_num_unwrapper_.Unwrap(packet.sequence_number);
while (!history_.empty() &&
creation_time.ms() - history_.begin()->second.creation_time_ms >
packet_age_limit_ms_) {
// TODO(sprang): Warn if erasing (too many) old items?
RemoveInFlightPacketBytes(history_.begin()->second);
if (history_.begin()->second.sequence_number > last_ack_seq_num_)
in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
history_.insert(std::make_pair(packet.long_sequence_number, packet));
history_.insert(std::make_pair(packet.sequence_number, packet));
}
}
absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
@ -129,12 +167,13 @@ absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
pending_untracked_size_ = 0;
}
if (!packet_retransmit) {
AddInFlightPacketBytes(it->second);
if (it->second.sequence_number > last_ack_seq_num_)
in_flight_.AddInFlightPacketBytes(it->second);
auto packet = it->second;
SentPacket msg;
msg.size = DataSize::bytes(packet.payload_size);
msg.send_time = Timestamp::ms(packet.send_time_ms);
msg.sequence_number = packet.long_sequence_number;
msg.sequence_number = packet.sequence_number;
msg.prior_unacked_data = DataSize::bytes(packet.unacknowledged_data);
msg.data_in_flight = GetOutstandingData();
return msg;
@ -155,71 +194,38 @@ absl::optional<TransportPacketsFeedback>
TransportFeedbackAdapter::ProcessTransportFeedback(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_receive_time) {
DataSize prior_in_flight = GetOutstandingData();
last_packet_feedback_vector_ =
GetPacketFeedbackVector(feedback, feedback_receive_time);
{
rtc::CritScope cs(&observers_lock_);
for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
for (const auto& packet : last_packet_feedback_vector_) {
if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) {
// If we found the ssrc, it means the the packet was in the
// history and we expect the the send time has been set. A reason why
// this would be false would be if ProcessTransportFeedback covering a
// packet would be called before ProcessSentPacket for the same
// packet. This should not happen if we handle ordering of events
// correctly.
// TODO(srte): Fix the tests that makes this happen and make this a
// DCHECK.
if (packet.send_time_ms == PacketFeedback::kNoSendTime) {
RTC_LOG(LS_ERROR)
<< "Received feedback before packet was indicated as sent";
continue;
}
StreamFeedbackObserver::StreamPacketInfo feedback_info;
feedback_info.ssrc = *packet.ssrc;
feedback_info.rtp_sequence_number = packet.rtp_sequence_number;
feedback_info.received =
packet.arrival_time_ms != PacketFeedback::kNotReceived;
selected_feedback.push_back(feedback_info);
}
}
if (!selected_feedback.empty()) {
observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
}
}
}
std::vector<PacketFeedback> feedback_vector = last_packet_feedback_vector_;
if (feedback_vector.empty())
if (feedback.GetPacketStatusCount() == 0) {
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
return absl::nullopt;
TransportPacketsFeedback msg;
for (const PacketFeedback& rtp_feedback : feedback_vector) {
if (rtp_feedback.send_time_ms != PacketFeedback::kNoSendTime) {
auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback);
msg.packet_feedbacks.push_back(feedback);
} else if (rtp_feedback.arrival_time_ms == PacketFeedback::kNotReceived) {
msg.sendless_arrival_times.push_back(Timestamp::PlusInfinity());
} else {
msg.sendless_arrival_times.push_back(
Timestamp::ms(rtp_feedback.arrival_time_ms));
}
}
std::vector<PacketFeedback> feedback_vector;
TransportPacketsFeedback msg;
msg.feedback_time = feedback_receive_time;
{
rtc::CritScope cs(&lock_);
msg.prior_in_flight =
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
feedback_vector =
ProcessTransportFeedbackInner(feedback, feedback_receive_time);
last_packet_feedback_vector_ = feedback_vector;
if (feedback_vector.empty())
return absl::nullopt;
for (const PacketFeedback& rtp_feedback : feedback_vector) {
msg.packet_feedbacks.push_back(
NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback));
}
auto it = history_.find(last_ack_seq_num_);
if (it != history_.end() &&
it->second.send_time_ms != PacketFeedback::kNoSendTime) {
msg.first_unacked_send_time = Timestamp::ms(it->second.send_time_ms);
}
msg.data_in_flight =
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
}
msg.feedback_time = feedback_receive_time;
msg.prior_in_flight = prior_in_flight;
msg.data_in_flight = GetOutstandingData();
SignalObservers(feedback_vector);
return msg;
}
@ -232,15 +238,11 @@ void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id,
DataSize TransportFeedbackAdapter::GetOutstandingData() const {
rtc::CritScope cs(&lock_);
auto it = in_flight_bytes_.find({local_net_id_, remote_net_id_});
if (it != in_flight_bytes_.end()) {
return DataSize::bytes(it->second);
} else {
return DataSize::Zero();
}
return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
}
std::vector<PacketFeedback> TransportFeedbackAdapter::GetPacketFeedbackVector(
std::vector<PacketFeedback>
TransportFeedbackAdapter::ProcessTransportFeedbackInner(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_time) {
// Add timestamp deltas to a local time base selected on first packet arrival.
@ -254,116 +256,102 @@ std::vector<PacketFeedback> TransportFeedbackAdapter::GetPacketFeedbackVector(
last_timestamp_us_ = feedback.GetBaseTimeUs();
std::vector<PacketFeedback> packet_feedback_vector;
if (feedback.GetPacketStatusCount() == 0) {
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
return packet_feedback_vector;
}
packet_feedback_vector.reserve(feedback.GetPacketStatusCount());
{
rtc::CritScope cs(&lock_);
size_t failed_lookups = 0;
int64_t offset_us = 0;
int64_t timestamp_ms = 0;
uint16_t seq_num = feedback.GetBaseSequence();
for (const auto& packet : feedback.GetReceivedPackets()) {
// Insert into the vector those unreceived packets which precede this
// iteration's received packet.
for (; seq_num != packet.sequence_number(); ++seq_num) {
PacketFeedback packet_feedback(PacketFeedback::kNotReceived, seq_num);
// Note: Element not removed from history because it might be reported
// as received by another feedback.
if (!GetFeedback(&packet_feedback, false))
++failed_lookups;
if (packet_feedback.local_net_id == local_net_id_ &&
packet_feedback.remote_net_id == remote_net_id_) {
packet_feedback_vector.push_back(packet_feedback);
}
}
// Handle this iteration's received packet.
offset_us += packet.delta_us();
timestamp_ms = current_offset_ms_ + (offset_us / 1000);
PacketFeedback packet_feedback(timestamp_ms, packet.sequence_number());
if (!GetFeedback(&packet_feedback, true))
++failed_lookups;
if (packet_feedback.local_net_id == local_net_id_ &&
packet_feedback.remote_net_id == remote_net_id_) {
packet_feedback_vector.push_back(packet_feedback);
}
size_t failed_lookups = 0;
size_t ignored = 0;
int64_t offset_us = 0;
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num = seq_num_unwrapper_.Unwrap(packet.sequence_number());
++seq_num;
if (seq_num > last_ack_seq_num_) {
// Starts at history_.begin() if last_ack_seq_num_ < 0, since any valid
// sequence number is >= 0.
for (auto it = history_.upper_bound(last_ack_seq_num_);
it != history_.upper_bound(seq_num); ++it) {
in_flight_.RemoveInFlightPacketBytes(it->second);
}
last_ack_seq_num_ = seq_num;
}
if (failed_lookups > 0) {
RTC_LOG(LS_WARNING) << "Failed to lookup send time for " << failed_lookups
<< " packet" << (failed_lookups > 1 ? "s" : "")
<< ". Send time history too small?";
auto it = history_.find(seq_num);
if (it == history_.end()) {
++failed_lookups;
continue;
}
if (it->second.send_time_ms == PacketFeedback::kNoSendTime) {
// TODO(srte): Fix the tests that makes this happen and make this a
// DCHECK.
RTC_DLOG(LS_ERROR)
<< "Received feedback before packet was indicated as sent";
continue;
}
PacketFeedback packet_feedback = it->second;
if (!packet.received()) {
// Note: Element not removed from history because it might be reported
// as received by another feedback.
packet_feedback.arrival_time_ms = PacketFeedback::kNotReceived;
} else {
offset_us += packet.delta_us();
packet_feedback.arrival_time_ms = current_offset_ms_ + (offset_us / 1000);
history_.erase(it);
}
if (packet_feedback.local_net_id == local_net_id_ &&
packet_feedback.remote_net_id == remote_net_id_) {
packet_feedback_vector.push_back(packet_feedback);
} else {
++ignored;
}
}
if (failed_lookups > 0) {
RTC_LOG(LS_WARNING) << "Failed to lookup send time for " << failed_lookups
<< " packet" << (failed_lookups > 1 ? "s" : "")
<< ". Send time history too small?";
}
if (ignored > 0) {
RTC_LOG(LS_INFO) << "Ignoring " << ignored
<< " packets because they were sent on a different route.";
}
return packet_feedback_vector;
}
void TransportFeedbackAdapter::SignalObservers(
const std::vector<PacketFeedback>& feedback_vector) {
rtc::CritScope cs(&observers_lock_);
for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
for (const auto& packet : feedback_vector) {
if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) {
// If we found the ssrc, it means the the packet was in the
// history and we expect the the send time has been set. A reason why
// this would be false would be if ProcessTransportFeedback covering a
// packet would be called before ProcessSentPacket for the same
// packet. This should not happen if we handle ordering of events
// correctly.
RTC_DCHECK_NE(packet.send_time_ms, PacketFeedback::kNoSendTime);
StreamFeedbackObserver::StreamPacketInfo packet_info;
packet_info.ssrc = *packet.ssrc;
packet_info.rtp_sequence_number = packet.rtp_sequence_number;
packet_info.received =
packet.arrival_time_ms != PacketFeedback::kNotReceived;
selected_feedback.push_back(packet_info);
}
}
if (!selected_feedback.empty()) {
observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
}
}
}
std::vector<PacketFeedback>
TransportFeedbackAdapter::GetTransportFeedbackVector() const {
rtc::CritScope cs(&lock_);
return last_packet_feedback_vector_;
}
bool TransportFeedbackAdapter::GetFeedback(PacketFeedback* packet_feedback,
bool remove) {
RTC_DCHECK(packet_feedback);
int64_t acked_seq_num =
seq_num_unwrapper_.Unwrap(packet_feedback->sequence_number);
if (acked_seq_num > last_ack_seq_num_) {
// Returns history_.begin() if last_ack_seq_num_ < 0, since any valid
// sequence number is >= 0.
auto unacked_it = history_.lower_bound(last_ack_seq_num_);
auto newly_acked_end = history_.upper_bound(acked_seq_num);
for (; unacked_it != newly_acked_end; ++unacked_it) {
RemoveInFlightPacketBytes(unacked_it->second);
}
last_ack_seq_num_ = acked_seq_num;
}
auto it = history_.find(acked_seq_num);
if (it == history_.end())
return false;
// Save arrival_time not to overwrite it.
int64_t arrival_time_ms = packet_feedback->arrival_time_ms;
*packet_feedback = it->second;
packet_feedback->arrival_time_ms = arrival_time_ms;
if (remove)
history_.erase(it);
return true;
}
void TransportFeedbackAdapter::AddInFlightPacketBytes(
const PacketFeedback& packet) {
RTC_DCHECK_NE(packet.send_time_ms, -1);
if (last_ack_seq_num_ >= packet.long_sequence_number)
return;
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second += packet.payload_size;
} else {
in_flight_bytes_[{packet.local_net_id, packet.remote_net_id}] =
packet.payload_size;
}
}
void TransportFeedbackAdapter::RemoveInFlightPacketBytes(
const PacketFeedback& packet) {
if (packet.send_time_ms < 0 ||
last_ack_seq_num_ >= packet.long_sequence_number)
return;
auto it = in_flight_bytes_.find({packet.local_net_id, packet.remote_net_id});
if (it != in_flight_bytes_.end()) {
it->second -= packet.payload_size;
if (it->second == 0)
in_flight_bytes_.erase(it);
}
}
} // namespace webrtc

View File

@ -26,6 +26,55 @@
namespace webrtc {
struct PacketFeedback {
PacketFeedback() = default;
static constexpr int kNotAProbe = -1;
static constexpr int64_t kNotReceived = -1;
static constexpr int64_t kNoSendTime = -1;
static constexpr int64_t kNoCreationTime = -1;
// NOTE! The variable |creation_time_ms| is not used when testing equality.
// This is due to |creation_time_ms| only being used by SendTimeHistory
// for book-keeping, and is of no interest outside that class.
// TODO(philipel): Remove |creation_time_ms| from PacketFeedback when cleaning
// up SendTimeHistory.
// Time corresponding to when this object was created.
int64_t creation_time_ms = kNoCreationTime;
// Time corresponding to when the packet was received. Timestamped with the
// receiver's clock. For unreceived packet, the sentinel value kNotReceived
// is used.
int64_t arrival_time_ms = kNotReceived;
// Time corresponding to when the packet was sent, timestamped with the
// sender's clock.
int64_t send_time_ms = kNoSendTime;
// Session unique packet identifier, incremented with 1 for every packet
// generated by the sender.
int64_t sequence_number = 0;
// Size of the packet excluding RTP headers.
size_t payload_size = 0;
// Size of preceeding packets that are not part of feedback.
size_t unacknowledged_data = 0;
// The network route ids that this packet is associated with.
uint16_t local_net_id = 0;
uint16_t remote_net_id = 0;
// Pacing information about this packet.
PacedPacketInfo pacing_info;
// The SSRC and RTP sequence number of the packet this feedback refers to.
absl::optional<uint32_t> ssrc;
uint16_t rtp_sequence_number = 0;
};
class InFlightBytesTracker {
public:
void AddInFlightPacketBytes(const PacketFeedback& packet);
void RemoveInFlightPacketBytes(const PacketFeedback& packet);
DataSize GetOutstandingData(uint16_t local_net_id,
uint16_t remote_net_id) const;
private:
using RemoteAndLocalNetworkId = std::pair<uint16_t, uint16_t>;
std::map<RemoteAndLocalNetworkId, size_t> in_flight_bytes_;
};
class TransportFeedbackAdapter : public StreamFeedbackProvider {
public:
TransportFeedbackAdapter();
@ -54,24 +103,16 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider {
DataSize GetOutstandingData() const;
private:
using RemoteAndLocalNetworkId = std::pair<uint16_t, uint16_t>;
enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate };
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
std::vector<PacketFeedback> GetPacketFeedbackVector(
std::vector<PacketFeedback> ProcessTransportFeedbackInner(
const rtcp::TransportFeedback& feedback,
Timestamp feedback_time);
Timestamp feedback_time) RTC_RUN_ON(&lock_);
// Look up PacketFeedback for a sent packet, based on the sequence number, and
// populate all fields except for arrival_time. The packet parameter must
// thus be non-null and have the sequence_number field set.
bool GetFeedback(PacketFeedback* packet_feedback, bool remove)
RTC_RUN_ON(&lock_);
void AddInFlightPacketBytes(const PacketFeedback& packet) RTC_RUN_ON(&lock_);
void RemoveInFlightPacketBytes(const PacketFeedback& packet)
RTC_RUN_ON(&lock_);
void SignalObservers(
const std::vector<PacketFeedback>& packet_feedback_vector);
rtc::CriticalSection lock_;
@ -85,12 +126,12 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider {
// Sequence numbers are never negative, using -1 as it always < a real
// sequence number.
int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1;
std::map<RemoteAndLocalNetworkId, size_t> in_flight_bytes_
RTC_GUARDED_BY(&lock_);
InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_);
int64_t current_offset_ms_;
int64_t last_timestamp_us_;
std::vector<PacketFeedback> last_packet_feedback_vector_;
int64_t current_offset_ms_ RTC_GUARDED_BY(&lock_);
int64_t last_timestamp_us_ RTC_GUARDED_BY(&lock_);
std::vector<PacketFeedback> last_packet_feedback_vector_
RTC_GUARDED_BY(&lock_);
uint16_t local_net_id_ RTC_GUARDED_BY(&lock_);
uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_);

View File

@ -14,7 +14,6 @@
#include <memory>
#include <vector>
#include "modules/congestion_controller/rtp/congestion_controller_unittests_helper.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/checks.h"
@ -36,6 +35,47 @@ const PacedPacketInfo kPacingInfo1(1, 8, 4000);
const PacedPacketInfo kPacingInfo2(2, 14, 7000);
const PacedPacketInfo kPacingInfo3(3, 20, 10000);
const PacedPacketInfo kPacingInfo4(4, 22, 10000);
void ComparePacketFeedbackVectors(const std::vector<PacketFeedback>& truth,
const std::vector<PacketFeedback>& input) {
ASSERT_EQ(truth.size(), input.size());
size_t len = truth.size();
// truth contains the input data for the test, and input is what will be
// sent to the bandwidth estimator. truth.arrival_tims_ms is used to
// populate the transport feedback messages. As these times may be changed
// (because of resolution limits in the packets, and because of the time
// base adjustment performed by the TransportFeedbackAdapter at the first
// packet, the truth[x].arrival_time and input[x].arrival_time may not be
// equal. However, the difference must be the same for all x.
int64_t arrival_time_delta =
truth[0].arrival_time_ms - input[0].arrival_time_ms;
for (size_t i = 0; i < len; ++i) {
RTC_CHECK(truth[i].arrival_time_ms != PacketFeedback::kNotReceived);
if (input[i].arrival_time_ms != PacketFeedback::kNotReceived) {
EXPECT_EQ(truth[i].arrival_time_ms,
input[i].arrival_time_ms + arrival_time_delta);
}
EXPECT_EQ(truth[i].send_time_ms, input[i].send_time_ms);
EXPECT_EQ(truth[i].sequence_number, input[i].sequence_number);
EXPECT_EQ(truth[i].payload_size, input[i].payload_size);
EXPECT_EQ(truth[i].pacing_info, input[i].pacing_info);
}
}
PacketFeedback CreatePacketFeedback(int64_t arrival_time_ms,
int64_t send_time_ms,
int64_t sequence_number,
size_t payload_size,
const PacedPacketInfo& pacing_info) {
PacketFeedback res;
res.arrival_time_ms = arrival_time_ms;
res.send_time_ms = send_time_ms;
res.sequence_number = sequence_number;
res.payload_size = payload_size;
res.pacing_info = pacing_info;
return res;
}
} // namespace
namespace test {
@ -85,25 +125,14 @@ class TransportFeedbackAdapterTest : public ::testing::Test {
};
TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
const uint32_t kSsrc = 8832;
MockStreamFeedbackObserver mock;
adapter_->RegisterStreamFeedbackObserver({kSsrc}, &mock);
const std::vector<PacketFeedback> packets = {
PacketFeedback(100, 200, 0, 1000, kPacingInfo0),
PacketFeedback(110, 210, 1, 2000, kPacingInfo0),
PacketFeedback(120, 220, 2, 3000, kPacingInfo0)};
for (auto& packet : packets) {
const size_t kOverhead = 40;
RtpPacketSendInfo send_info;
send_info.ssrc = kSsrc;
send_info.pacing_info = packet.pacing_info;
send_info.has_rtp_sequence_number = true;
send_info.length = packet.payload_size;
send_info.rtp_sequence_number = packet.rtp_sequence_number;
send_info.rtp_sequence_number = packet.sequence_number;
adapter_->AddPacket(send_info, kOverhead, clock_.CurrentTime());
}
CreatePacketFeedback(100, 200, 0, 1000, kPacingInfo0),
CreatePacketFeedback(110, 210, 1, 2000, kPacingInfo0),
CreatePacketFeedback(120, 220, 2, 3000, kPacingInfo0)};
rtcp::TransportFeedback feedback;
feedback.SetBase(packets[0].sequence_number,
packets[0].arrival_time_ms * 1000);
@ -120,14 +149,15 @@ TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
adapter_->DeRegisterStreamFeedbackObserver(&mock);
const PacketFeedback new_packet(130, 230, 3, 4000, kPacingInfo0);
const PacketFeedback new_packet =
CreatePacketFeedback(130, 230, 3, 4000, kPacingInfo0);
OnSentPacket(new_packet);
rtcp::TransportFeedback second_feedback;
second_feedback.SetBase(new_packet.sequence_number,
new_packet.arrival_time_ms * 1000);
EXPECT_TRUE(feedback.AddReceivedPacket(new_packet.sequence_number,
new_packet.arrival_time_ms * 1000));
EXPECT_TRUE(second_feedback.AddReceivedPacket(
new_packet.sequence_number, new_packet.arrival_time_ms * 1000));
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
adapter_->ProcessTransportFeedback(
second_feedback, Timestamp::ms(clock_.TimeInMilliseconds()));
@ -151,11 +181,11 @@ TEST_F(TransportFeedbackAdapterTest, ObserverMissingDeRegistrationDeathTest) {
TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
std::vector<PacketFeedback> packets;
packets.push_back(PacketFeedback(100, 200, 0, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(120, 220, 2, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(130, 230, 3, 1500, kPacingInfo1));
packets.push_back(PacketFeedback(140, 240, 4, 1500, kPacingInfo1));
packets.push_back(CreatePacketFeedback(100, 200, 0, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(110, 210, 1, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(120, 220, 2, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(130, 230, 3, 1500, kPacingInfo1));
packets.push_back(CreatePacketFeedback(140, 240, 4, 1500, kPacingInfo1));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
@ -178,13 +208,13 @@ TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
TEST_F(TransportFeedbackAdapterTest, FeedbackVectorReportsUnreceived) {
std::vector<PacketFeedback> sent_packets = {
PacketFeedback(100, 220, 0, 1500, kPacingInfo0),
PacketFeedback(110, 210, 1, 1500, kPacingInfo0),
PacketFeedback(120, 220, 2, 1500, kPacingInfo0),
PacketFeedback(130, 230, 3, 1500, kPacingInfo0),
PacketFeedback(140, 240, 4, 1500, kPacingInfo0),
PacketFeedback(150, 250, 5, 1500, kPacingInfo0),
PacketFeedback(160, 260, 6, 1500, kPacingInfo0)};
CreatePacketFeedback(100, 220, 0, 1500, kPacingInfo0),
CreatePacketFeedback(110, 210, 1, 1500, kPacingInfo0),
CreatePacketFeedback(120, 220, 2, 1500, kPacingInfo0),
CreatePacketFeedback(130, 230, 3, 1500, kPacingInfo0),
CreatePacketFeedback(140, 240, 4, 1500, kPacingInfo0),
CreatePacketFeedback(150, 250, 5, 1500, kPacingInfo0),
CreatePacketFeedback(160, 260, 6, 1500, kPacingInfo0)};
for (const PacketFeedback& packet : sent_packets)
OnSentPacket(packet);
@ -213,11 +243,11 @@ TEST_F(TransportFeedbackAdapterTest, FeedbackVectorReportsUnreceived) {
TEST_F(TransportFeedbackAdapterTest, HandlesDroppedPackets) {
std::vector<PacketFeedback> packets;
packets.push_back(PacketFeedback(100, 200, 0, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo1));
packets.push_back(PacketFeedback(120, 220, 2, 1500, kPacingInfo2));
packets.push_back(PacketFeedback(130, 230, 3, 1500, kPacingInfo3));
packets.push_back(PacketFeedback(140, 240, 4, 1500, kPacingInfo4));
packets.push_back(CreatePacketFeedback(100, 200, 0, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(110, 210, 1, 1500, kPacingInfo1));
packets.push_back(CreatePacketFeedback(120, 220, 2, 1500, kPacingInfo2));
packets.push_back(CreatePacketFeedback(130, 230, 3, 1500, kPacingInfo3));
packets.push_back(CreatePacketFeedback(140, 240, 4, 1500, kPacingInfo4));
const uint16_t kSendSideDropBefore = 1;
const uint16_t kReceiveSideDropAfter = 3;
@ -241,14 +271,11 @@ TEST_F(TransportFeedbackAdapterTest, HandlesDroppedPackets) {
feedback.Build();
std::vector<PacketFeedback> expected_packets(
packets.begin(), packets.begin() + kReceiveSideDropAfter + 1);
packets.begin() + kSendSideDropBefore,
packets.begin() + kReceiveSideDropAfter + 1);
// Packets that have timed out on the send-side have lost the
// information stored on the send-side.
for (size_t i = 0; i < kSendSideDropBefore; ++i) {
expected_packets[i].send_time_ms = -1;
expected_packets[i].payload_size = 0;
expected_packets[i].pacing_info = PacedPacketInfo();
}
// information stored on the send-side. And they will not be reported to
// observers since we won't know that they come from the same networks.
adapter_->ProcessTransportFeedback(
feedback, Timestamp::ms(clock_.TimeInMilliseconds()));
@ -261,12 +288,12 @@ TEST_F(TransportFeedbackAdapterTest, SendTimeWrapsBothWays) {
static_cast<int64_t>(1 << 8) *
static_cast<int64_t>((1 << 23) - 1) / 1000;
std::vector<PacketFeedback> packets;
packets.push_back(
PacketFeedback(kHighArrivalTimeMs - 64, 200, 0, 1500, PacedPacketInfo()));
packets.push_back(
PacketFeedback(kHighArrivalTimeMs + 64, 210, 1, 1500, PacedPacketInfo()));
packets.push_back(
PacketFeedback(kHighArrivalTimeMs, 220, 2, 1500, PacedPacketInfo()));
packets.push_back(CreatePacketFeedback(kHighArrivalTimeMs - 64, 200, 0, 1500,
PacedPacketInfo()));
packets.push_back(CreatePacketFeedback(kHighArrivalTimeMs + 64, 210, 1, 1500,
PacedPacketInfo()));
packets.push_back(CreatePacketFeedback(kHighArrivalTimeMs, 220, 2, 1500,
PacedPacketInfo()));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
@ -296,9 +323,9 @@ TEST_F(TransportFeedbackAdapterTest, SendTimeWrapsBothWays) {
TEST_F(TransportFeedbackAdapterTest, HandlesArrivalReordering) {
std::vector<PacketFeedback> packets;
packets.push_back(PacketFeedback(120, 200, 0, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo0));
packets.push_back(PacketFeedback(100, 220, 2, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(120, 200, 0, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(110, 210, 1, 1500, kPacingInfo0));
packets.push_back(CreatePacketFeedback(100, 220, 2, 1500, kPacingInfo0));
for (const PacketFeedback& packet : packets)
OnSentPacket(packet);
@ -333,8 +360,11 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) {
rtcp::TransportFeedback::kDeltaScaleFactor *
std::numeric_limits<int16_t>::min();
PacketFeedback packet_feedback(100, 200, 0, 1500, true, 0, 0,
PacedPacketInfo());
PacketFeedback packet_feedback;
packet_feedback.sequence_number = 1;
packet_feedback.send_time_ms = 100;
packet_feedback.arrival_time_ms = 200;
packet_feedback.payload_size = 1500;
sent_packets.push_back(packet_feedback);
packet_feedback.send_time_ms += kSmallDeltaUs / 1000;
@ -409,7 +439,8 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) {
}
TEST_F(TransportFeedbackAdapterTest, IgnoreDuplicatePacketSentCalls) {
const PacketFeedback packet(100, 200, 0, 1500, kPacingInfo0);
const PacketFeedback packet =
CreatePacketFeedback(100, 200, 0, 1500, kPacingInfo0);
// Add a packet and then mark it as sent.
RtpPacketSendInfo packet_info;

View File

@ -44,78 +44,6 @@ bool IsLegalRsidName(absl::string_view name) {
StreamDataCounters::StreamDataCounters() : first_packet_time_ms(-1) {}
PacketFeedback::PacketFeedback(int64_t arrival_time_ms,
uint16_t sequence_number)
: PacketFeedback(-1,
arrival_time_ms,
kNoSendTime,
sequence_number,
0,
0,
0,
PacedPacketInfo()) {}
PacketFeedback::PacketFeedback(int64_t arrival_time_ms,
int64_t send_time_ms,
uint16_t sequence_number,
size_t payload_size,
const PacedPacketInfo& pacing_info)
: PacketFeedback(-1,
arrival_time_ms,
send_time_ms,
sequence_number,
payload_size,
0,
0,
pacing_info) {}
PacketFeedback::PacketFeedback(int64_t creation_time_ms,
uint16_t sequence_number,
size_t payload_size,
uint16_t local_net_id,
uint16_t remote_net_id,
const PacedPacketInfo& pacing_info)
: PacketFeedback(creation_time_ms,
kNotReceived,
kNoSendTime,
sequence_number,
payload_size,
local_net_id,
remote_net_id,
pacing_info) {}
PacketFeedback::PacketFeedback(int64_t creation_time_ms,
int64_t arrival_time_ms,
int64_t send_time_ms,
uint16_t sequence_number,
size_t payload_size,
uint16_t local_net_id,
uint16_t remote_net_id,
const PacedPacketInfo& pacing_info)
: creation_time_ms(creation_time_ms),
arrival_time_ms(arrival_time_ms),
send_time_ms(send_time_ms),
sequence_number(sequence_number),
long_sequence_number(0),
payload_size(payload_size),
unacknowledged_data(0),
local_net_id(local_net_id),
remote_net_id(remote_net_id),
pacing_info(pacing_info),
ssrc(0),
rtp_sequence_number(0) {}
PacketFeedback::PacketFeedback(const PacketFeedback&) = default;
PacketFeedback& PacketFeedback::operator=(const PacketFeedback&) = default;
PacketFeedback::~PacketFeedback() = default;
bool PacketFeedback::operator==(const PacketFeedback& rhs) const {
return arrival_time_ms == rhs.arrival_time_ms &&
send_time_ms == rhs.send_time_ms &&
sequence_number == rhs.sequence_number &&
payload_size == rhs.payload_size && pacing_info == rhs.pacing_info;
}
void RtpPacketCounter::AddPacket(const RtpPacket& packet) {
++packets;
header_bytes += packet.headers_size();

View File

@ -211,75 +211,6 @@ class RtcpBandwidthObserver {
virtual ~RtcpBandwidthObserver() {}
};
struct PacketFeedback {
PacketFeedback(int64_t arrival_time_ms, uint16_t sequence_number);
PacketFeedback(int64_t arrival_time_ms,
int64_t send_time_ms,
uint16_t sequence_number,
size_t payload_size,
const PacedPacketInfo& pacing_info);
PacketFeedback(int64_t creation_time_ms,
uint16_t sequence_number,
size_t payload_size,
uint16_t local_net_id,
uint16_t remote_net_id,
const PacedPacketInfo& pacing_info);
PacketFeedback(int64_t creation_time_ms,
int64_t arrival_time_ms,
int64_t send_time_ms,
uint16_t sequence_number,
size_t payload_size,
uint16_t local_net_id,
uint16_t remote_net_id,
const PacedPacketInfo& pacing_info);
PacketFeedback(const PacketFeedback&);
PacketFeedback& operator=(const PacketFeedback&);
~PacketFeedback();
static constexpr int kNotAProbe = -1;
static constexpr int64_t kNotReceived = -1;
static constexpr int64_t kNoSendTime = -1;
// NOTE! The variable |creation_time_ms| is not used when testing equality.
// This is due to |creation_time_ms| only being used by SendTimeHistory
// for book-keeping, and is of no interest outside that class.
// TODO(philipel): Remove |creation_time_ms| from PacketFeedback when cleaning
// up SendTimeHistory.
bool operator==(const PacketFeedback& rhs) const;
// Time corresponding to when this object was created.
int64_t creation_time_ms;
// Time corresponding to when the packet was received. Timestamped with the
// receiver's clock. For unreceived packet, the sentinel value kNotReceived
// is used.
int64_t arrival_time_ms;
// Time corresponding to when the packet was sent, timestamped with the
// sender's clock.
int64_t send_time_ms;
// Packet identifier, incremented with 1 for every packet generated by the
// sender.
uint16_t sequence_number;
// Session unique packet identifier, incremented with 1 for every packet
// generated by the sender.
int64_t long_sequence_number;
// Size of the packet excluding RTP headers.
size_t payload_size;
// Size of preceeding packets that are not part of feedback.
size_t unacknowledged_data;
// The network route ids that this packet is associated with.
uint16_t local_net_id;
uint16_t remote_net_id;
// Pacing information about this packet.
PacedPacketInfo pacing_info;
// The SSRC and RTP sequence number of the packet this feedback refers to.
absl::optional<uint32_t> ssrc;
uint16_t rtp_sequence_number;
};
struct RtpPacketSendInfo {
public:
RtpPacketSendInfo() = default;
@ -320,18 +251,6 @@ class RtcpFeedbackSenderInterface {
virtual void UnsetRemb() = 0;
};
// DEPRECATED: To be removed when usages have been removed.
class PacketFeedbackObserver {
public:
virtual ~PacketFeedbackObserver() = default;
// DEPRECATED: OnPacketAdded will not actually be called.
// TODO(srte): Remove when all overrides have been removed.
virtual void OnPacketAdded(uint32_t ssrc, uint16_t seq_num) {}
virtual void OnPacketFeedbackVector(
const std::vector<PacketFeedback>& packet_feedback_vector) = 0;
};
class StreamFeedbackObserver {
public:
struct StreamPacketInfo {

View File

@ -262,7 +262,7 @@ void TransportFeedback::LastChunk::DecodeRunLength(uint16_t chunk,
}
TransportFeedback::TransportFeedback()
: TransportFeedback(/*include_timestamps=*/true, /*include_lost*/ false) {}
: TransportFeedback(/*include_timestamps=*/true, /*include_lost=*/true) {}
TransportFeedback::TransportFeedback(bool include_timestamps, bool include_lost)
: include_lost_(include_lost),
@ -335,9 +335,12 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number,
uint16_t last_seq_no = next_seq_no - 1;
if (!IsNewerSequenceNumber(sequence_number, last_seq_no))
return false;
for (; next_seq_no != sequence_number; ++next_seq_no)
for (; next_seq_no != sequence_number; ++next_seq_no) {
if (!AddDeltaSize(0))
return false;
if (include_lost_)
all_packets_.emplace_back(next_seq_no);
}
}
DeltaSize delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
@ -345,6 +348,8 @@ bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number,
return false;
received_packets_.emplace_back(sequence_number, delta);
if (include_lost_)
all_packets_.emplace_back(sequence_number, delta);
last_timestamp_us_ += delta * kDeltaScaleFactor;
if (include_timestamps_) {
size_bytes_ += delta_size;

View File

@ -100,7 +100,6 @@ class MockTransportFeedbackObserver : public TransportFeedbackObserver {
public:
MOCK_METHOD1(OnAddPacket, void(const RtpPacketSendInfo&));
MOCK_METHOD1(OnTransportFeedback, void(const rtcp::TransportFeedback&));
MOCK_CONST_METHOD0(GetTransportFeedbackVector, std::vector<PacketFeedback>());
};
class MockModuleRtpRtcp : public RTCPReceiver::ModuleRtpRtcp {

View File

@ -180,7 +180,6 @@ class MockTransportFeedbackObserver : public TransportFeedbackObserver {
public:
MOCK_METHOD1(OnAddPacket, void(const RtpPacketSendInfo&));
MOCK_METHOD1(OnTransportFeedback, void(const rtcp::TransportFeedback&));
MOCK_CONST_METHOD0(GetTransportFeedbackVector, std::vector<PacketFeedback>());
};
class MockOverheadObserver : public OverheadObserver {