Moves functionality to TransportFeedbackAdapter.

This moves simple logic from SendSideCongestionController to
TransportFeedbackAdapter. The purpose is to make it easier to
reuse TransportFeedbackAdapter without requiring everything
in SendSideCongestionController.

Bug: webrtc:9586
Change-Id: I35acedd15001d75a06c38ece76868afecd6afa18
Reviewed-on: https://webrtc-review.googlesource.com/c/105106
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25177}
This commit is contained in:
Sebastian Jansson
2018-10-15 14:24:03 +02:00
committed by Commit Bot
parent ed04912ccd
commit 7341ab60d0
8 changed files with 128 additions and 123 deletions

View File

@ -69,8 +69,10 @@ rtc_static_library("transport_feedback") {
deps = [
"../..:module_api",
"../../../api/transport:network_control",
"../../../api/units:data_size",
"../../../rtc_base:checks",
"../../../rtc_base:rtc_base_approved",
"../../../rtc_base/network:sent_packet",
"../../../system_wrappers",
"../../rtp_rtcp:rtp_rtcp_format",
]

View File

@ -48,41 +48,6 @@ bool IsPacerPushbackExperimentEnabled() {
}
void SortPacketFeedbackVector(std::vector<webrtc::PacketFeedback>* input) {
std::sort(input->begin(), input->end(), PacketFeedbackComparator());
}
PacketResult NetworkPacketFeedbackFromRtpPacketFeedback(
const webrtc::PacketFeedback& pf) {
PacketResult feedback;
if (pf.arrival_time_ms == webrtc::PacketFeedback::kNotReceived)
feedback.receive_time = Timestamp::PlusInfinity();
else
feedback.receive_time = Timestamp::ms(pf.arrival_time_ms);
if (pf.send_time_ms != webrtc::PacketFeedback::kNoSendTime) {
feedback.sent_packet = SentPacket();
feedback.sent_packet->sequence_number = pf.long_sequence_number;
feedback.sent_packet->send_time = Timestamp::ms(pf.send_time_ms);
feedback.sent_packet->size = DataSize::bytes(pf.payload_size);
feedback.sent_packet->pacing_info = pf.pacing_info;
}
return feedback;
}
std::vector<PacketResult> PacketResultsFromRtpFeedbackVector(
const std::vector<PacketFeedback>& feedback_vector) {
RTC_DCHECK(std::is_sorted(feedback_vector.begin(), feedback_vector.end(),
PacketFeedbackComparator()));
std::vector<PacketResult> packet_feedbacks;
packet_feedbacks.reserve(feedback_vector.size());
for (const PacketFeedback& rtp_feedback : feedback_vector) {
auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback);
packet_feedbacks.push_back(feedback);
}
return packet_feedbacks;
}
TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
int max_bitrate_bps,
int start_bitrate_bps,
@ -547,29 +512,16 @@ void SendSideCongestionController::SignalNetworkState(NetworkState state) {
void SendSideCongestionController::OnSentPacket(
const rtc::SentPacket& sent_packet) {
if (sent_packet.packet_id != -1) {
transport_feedback_adapter_.OnSentPacket(sent_packet.packet_id,
sent_packet.send_time_ms);
MaybeUpdateOutstandingData();
auto packet = transport_feedback_adapter_.GetPacket(sent_packet.packet_id);
if (packet.has_value()) {
SentPacket msg;
msg.size = DataSize::bytes(packet->payload_size);
msg.send_time = Timestamp::ms(packet->send_time_ms);
msg.sequence_number = packet->long_sequence_number;
msg.prior_unacked_data = DataSize::bytes(packet->unacknowledged_data);
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(msg));
});
}
} else if (sent_packet.info.included_in_allocation) {
transport_feedback_adapter_.AddUntracked(sent_packet.info.packet_size_bytes,
sent_packet.send_time_ms);
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
if (packet_msg) {
task_queue_->PostTask([this, packet_msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg));
});
}
MaybeUpdateOutstandingData();
}
void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
@ -654,36 +606,22 @@ void SendSideCongestionController::AddPacket(
void SendSideCongestionController::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
int64_t feedback_time_ms = clock_->TimeInMilliseconds();
DataSize prior_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
transport_feedback_adapter_.OnTransportFeedback(feedback);
MaybeUpdateOutstandingData();
std::vector<PacketFeedback> feedback_vector =
transport_feedback_adapter_.GetTransportFeedbackVector();
SortPacketFeedbackVector(&feedback_vector);
if (!feedback_vector.empty()) {
TransportPacketsFeedback msg;
msg.packet_feedbacks = PacketResultsFromRtpFeedbackVector(feedback_vector);
msg.feedback_time = Timestamp::ms(feedback_time_ms);
msg.prior_in_flight = prior_in_flight;
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
absl::optional<TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback);
if (feedback_msg) {
task_queue_->PostTask([this, feedback_msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(
controller_->OnTransportPacketsFeedback(msg));
controller_->OnTransportPacketsFeedback(*feedback_msg));
});
}
MaybeUpdateOutstandingData();
}
void SendSideCongestionController::MaybeUpdateOutstandingData() {
DataSize in_flight_data =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
task_queue_->PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(task_queue_);
pacer_controller_->OnOutstandingData(in_flight_data);

View File

@ -111,13 +111,13 @@ bool SendTimeHistory::GetFeedback(PacketFeedback* packet_feedback,
return true;
}
size_t SendTimeHistory::GetOutstandingBytes(uint16_t local_net_id,
uint16_t remote_net_id) const {
DataSize SendTimeHistory::GetOutstandingData(uint16_t local_net_id,
uint16_t remote_net_id) const {
auto it = in_flight_bytes_.find({local_net_id, remote_net_id});
if (it != in_flight_bytes_.end()) {
return it->second;
return DataSize::bytes(it->second);
} else {
return 0;
return DataSize::Zero();
}
}

View File

@ -14,6 +14,7 @@
#include <map>
#include <utility>
#include "api/units/data_size.h"
#include "modules/include/module_common_types.h"
#include "rtc_base/constructormagic.h"
@ -43,8 +44,8 @@ class SendTimeHistory {
// thus be non-null and have the sequence_number field set.
bool GetFeedback(PacketFeedback* packet_feedback, bool remove);
size_t GetOutstandingBytes(uint16_t local_net_id,
uint16_t remote_net_id) const;
DataSize GetOutstandingData(uint16_t local_net_id,
uint16_t remote_net_id) const;
private:
using RemoteAndLocalNetworkId = std::pair<uint16_t, uint16_t>;

View File

@ -20,7 +20,42 @@
namespace webrtc {
namespace webrtc_cc {
namespace {
void SortPacketFeedbackVector(std::vector<webrtc::PacketFeedback>* input) {
std::sort(input->begin(), input->end(), PacketFeedbackComparator());
}
PacketResult NetworkPacketFeedbackFromRtpPacketFeedback(
const webrtc::PacketFeedback& pf) {
PacketResult feedback;
if (pf.arrival_time_ms == webrtc::PacketFeedback::kNotReceived)
feedback.receive_time = Timestamp::PlusInfinity();
else
feedback.receive_time = Timestamp::ms(pf.arrival_time_ms);
if (pf.send_time_ms != webrtc::PacketFeedback::kNoSendTime) {
feedback.sent_packet = SentPacket();
feedback.sent_packet->sequence_number = pf.long_sequence_number;
feedback.sent_packet->send_time = Timestamp::ms(pf.send_time_ms);
feedback.sent_packet->size = DataSize::bytes(pf.payload_size);
feedback.sent_packet->pacing_info = pf.pacing_info;
}
return feedback;
}
std::vector<PacketResult> PacketResultsFromRtpFeedbackVector(
const std::vector<PacketFeedback>& feedback_vector) {
RTC_DCHECK(std::is_sorted(feedback_vector.begin(), feedback_vector.end(),
PacketFeedbackComparator()));
std::vector<PacketResult> packet_feedbacks;
packet_feedbacks.reserve(feedback_vector.size());
for (const PacketFeedback& rtp_feedback : feedback_vector) {
auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback);
packet_feedbacks.push_back(feedback);
}
return packet_feedbacks;
}
} // namespace
const int64_t kNoTimestamp = -1;
const int64_t kSendTimeHistoryWindowMs = 60000;
const int64_t kBaseTimestampScaleFactor =
@ -77,22 +112,49 @@ void TransportFeedbackAdapter::AddPacket(uint32_t ssrc,
}
}
void TransportFeedbackAdapter::AddUntracked(size_t packet_size,
int64_t send_time_ms) {
absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
const rtc::SentPacket& sent_packet) {
rtc::CritScope cs(&lock_);
send_time_history_.AddUntracked(packet_size, send_time_ms);
// TODO(srte): Only use one way to indicate that packet feedback is used.
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
send_time_history_.OnSentPacket(sent_packet.packet_id,
sent_packet.send_time_ms);
absl::optional<PacketFeedback> packet =
send_time_history_.GetPacket(sent_packet.packet_id);
if (packet) {
SentPacket msg;
msg.size = DataSize::bytes(packet->payload_size);
msg.send_time = Timestamp::ms(packet->send_time_ms);
msg.sequence_number = packet->long_sequence_number;
msg.prior_unacked_data = DataSize::bytes(packet->unacknowledged_data);
msg.data_in_flight =
send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_);
return msg;
}
} else if (sent_packet.info.included_in_allocation) {
send_time_history_.AddUntracked(sent_packet.info.packet_size_bytes,
sent_packet.send_time_ms);
}
return absl::nullopt;
}
void TransportFeedbackAdapter::OnSentPacket(uint16_t sequence_number,
int64_t send_time_ms) {
rtc::CritScope cs(&lock_);
send_time_history_.OnSentPacket(sequence_number, send_time_ms);
}
absl::optional<TransportPacketsFeedback>
TransportFeedbackAdapter::ProcessTransportFeedback(
const rtcp::TransportFeedback& feedback) {
int64_t feedback_time_ms = clock_->TimeInMilliseconds();
DataSize prior_in_flight = GetOutstandingData();
OnTransportFeedback(feedback);
std::vector<PacketFeedback> feedback_vector = last_packet_feedback_vector_;
if (feedback_vector.empty())
return absl::nullopt;
absl::optional<PacketFeedback> TransportFeedbackAdapter::GetPacket(
uint16_t sequence_number) const {
rtc::CritScope cs(&lock_);
return send_time_history_.GetPacket(sequence_number);
SortPacketFeedbackVector(&feedback_vector);
TransportPacketsFeedback msg;
msg.packet_feedbacks = PacketResultsFromRtpFeedbackVector(feedback_vector);
msg.feedback_time = Timestamp::ms(feedback_time_ms);
msg.prior_in_flight = prior_in_flight;
msg.data_in_flight = GetOutstandingData();
return msg;
}
void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id,
@ -102,6 +164,11 @@ void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id,
remote_net_id_ = remote_id;
}
DataSize TransportFeedbackAdapter::GetOutstandingData() const {
rtc::CritScope cs(&lock_);
return send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_);
}
std::vector<PacketFeedback> TransportFeedbackAdapter::GetPacketFeedbackVector(
const rtcp::TransportFeedback& feedback) {
int64_t timestamp_us = feedback.GetBaseTimeUs();
@ -190,10 +257,5 @@ std::vector<PacketFeedback>
TransportFeedbackAdapter::GetTransportFeedbackVector() const {
return last_packet_feedback_vector_;
}
size_t TransportFeedbackAdapter::GetOutstandingBytes() const {
rtc::CritScope cs(&lock_);
return send_time_history_.GetOutstandingBytes(local_net_id_, remote_net_id_);
}
} // namespace webrtc_cc
} // namespace webrtc

View File

@ -17,6 +17,7 @@
#include "api/transport/network_types.h"
#include "modules/congestion_controller/rtp/send_time_history.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/network/sent_packet.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/thread_checker.h"
#include "system_wrappers/include/clock.h"
@ -42,23 +43,22 @@ class TransportFeedbackAdapter {
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info);
void AddUntracked(size_t packet_size, int64_t send_time_ms);
void OnSentPacket(uint16_t sequence_number, int64_t send_time_ms);
// TODO(holmer): This method should return DelayBasedBwe::Result so that we
// can get rid of the dependency on BitrateController. Requires changes
// to the CongestionController interface.
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
absl::optional<SentPacket> ProcessSentPacket(
const rtc::SentPacket& sent_packet);
absl::optional<TransportPacketsFeedback> ProcessTransportFeedback(
const rtcp::TransportFeedback& feedback);
std::vector<PacketFeedback> GetTransportFeedbackVector() const;
absl::optional<PacketFeedback> GetPacket(uint16_t sequence_number) const;
void SetTransportOverhead(int transport_overhead_bytes_per_packet);
void SetNetworkIds(uint16_t local_id, uint16_t remote_id);
size_t GetOutstandingBytes() const;
DataSize GetOutstandingData() const;
private:
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
std::vector<PacketFeedback> GetPacketFeedbackVector(
const rtcp::TransportFeedback& feedback);

View File

@ -69,8 +69,9 @@ class TransportFeedbackAdapterTest : public ::testing::Test {
adapter_->AddPacket(kSsrc, packet_feedback.sequence_number,
packet_feedback.payload_size,
packet_feedback.pacing_info);
adapter_->OnSentPacket(packet_feedback.sequence_number,
packet_feedback.send_time_ms);
adapter_->ProcessSentPacket(rtc::SentPacket(packet_feedback.sequence_number,
packet_feedback.send_time_ms,
rtc::PacketInfo()));
}
static constexpr uint32_t kSsrc = 8492;
@ -100,7 +101,7 @@ TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
}
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1);
adapter_->OnTransportFeedback(feedback);
adapter_->ProcessTransportFeedback(feedback);
adapter_->DeRegisterPacketFeedbackObserver(&mock);
@ -115,7 +116,7 @@ TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
EXPECT_TRUE(feedback.AddReceivedPacket(new_packet.sequence_number,
new_packet.arrival_time_ms * 1000));
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
adapter_->OnTransportFeedback(second_feedback);
adapter_->ProcessTransportFeedback(second_feedback);
}
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
@ -156,7 +157,7 @@ TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
feedback.Build();
adapter_->OnTransportFeedback(feedback);
adapter_->ProcessTransportFeedback(feedback);
ComparePacketFeedbackVectors(packets, adapter_->GetTransportFeedbackVector());
}
@ -189,7 +190,7 @@ TEST_F(TransportFeedbackAdapterTest, FeedbackVectorReportsUnreceived) {
feedback.Build();
adapter_->OnTransportFeedback(feedback);
adapter_->ProcessTransportFeedback(feedback);
ComparePacketFeedbackVectors(sent_packets,
adapter_->GetTransportFeedbackVector());
}
@ -233,7 +234,7 @@ TEST_F(TransportFeedbackAdapterTest, HandlesDroppedPackets) {
expected_packets[i].pacing_info = PacedPacketInfo();
}
adapter_->OnTransportFeedback(feedback);
adapter_->ProcessTransportFeedback(feedback);
ComparePacketFeedbackVectors(expected_packets,
adapter_->GetTransportFeedbackVector());
}
@ -269,7 +270,7 @@ TEST_F(TransportFeedbackAdapterTest, SendTimeWrapsBothWays) {
std::vector<PacketFeedback> expected_packets;
expected_packets.push_back(packets[i]);
adapter_->OnTransportFeedback(*feedback.get());
adapter_->ProcessTransportFeedback(*feedback.get());
ComparePacketFeedbackVectors(expected_packets,
adapter_->GetTransportFeedbackVector());
}
@ -298,7 +299,7 @@ TEST_F(TransportFeedbackAdapterTest, HandlesArrivalReordering) {
// Adapter keeps the packets ordered by sequence number (which is itself
// assigned by the order of transmission). Reordering by some other criteria,
// eg. arrival time, is up to the observers.
adapter_->OnTransportFeedback(feedback);
adapter_->ProcessTransportFeedback(feedback);
ComparePacketFeedbackVectors(packets, adapter_->GetTransportFeedbackVector());
}
@ -362,7 +363,7 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) {
std::vector<PacketFeedback> received_feedback;
EXPECT_TRUE(feedback.get() != nullptr);
adapter_->OnTransportFeedback(*feedback.get());
adapter_->ProcessTransportFeedback(*feedback.get());
ComparePacketFeedbackVectors(sent_packets,
adapter_->GetTransportFeedbackVector());
@ -377,7 +378,7 @@ TEST_F(TransportFeedbackAdapterTest, TimestampDeltas) {
rtcp::TransportFeedback::ParseFrom(raw_packet.data(), raw_packet.size());
EXPECT_TRUE(feedback.get() != nullptr);
adapter_->OnTransportFeedback(*feedback.get());
adapter_->ProcessTransportFeedback(*feedback.get());
{
std::vector<PacketFeedback> expected_packets;
expected_packets.push_back(packet_feedback);

View File

@ -201,6 +201,7 @@ absl::optional<int64_t> TransportFeedbackAdapter::GetMinFeedbackLoopRtt()
size_t TransportFeedbackAdapter::GetOutstandingBytes() const {
rtc::CritScope cs(&lock_);
return send_time_history_.GetOutstandingBytes(local_net_id_, remote_net_id_);
return send_time_history_.GetOutstandingData(local_net_id_, remote_net_id_)
.bytes();
}
} // namespace webrtc