Reland "Extracts ssrc based feedback tracking from feedback adapter."
This is a reland of 08c46adc1e9f9a8d74357fe132a68906ae6e6974 Original change's description: > Extracts ssrc based feedback tracking from feedback adapter. > > This prepares for moving TransportFeedbackAdapter to TaskQueue. > > Bug: webrtc:9883 > Change-Id: Ib333f6a6837ff6dd8b10813e8953e4d8cd5d8633 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162040 > Reviewed-by: Erik Språng <sprang@webrtc.org> > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#30076} Bug: webrtc:9883 Change-Id: Ia74a3b1fba4d83eece9b0eb6475d6e6aecb65700 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162201 Reviewed-by: Erik Språng <sprang@webrtc.org> Commit-Queue: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30266}
This commit is contained in:

committed by
Commit Bot

parent
ccab06fb72
commit
d61338fa6e
@ -228,7 +228,7 @@ void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
|
|||||||
}
|
}
|
||||||
StreamFeedbackProvider*
|
StreamFeedbackProvider*
|
||||||
RtpTransportControllerSend::GetStreamFeedbackProvider() {
|
RtpTransportControllerSend::GetStreamFeedbackProvider() {
|
||||||
return &transport_feedback_adapter_;
|
return &feedback_demuxer_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
|
void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
|
||||||
@ -468,6 +468,8 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
|
|||||||
|
|
||||||
void RtpTransportControllerSend::OnAddPacket(
|
void RtpTransportControllerSend::OnAddPacket(
|
||||||
const RtpPacketSendInfo& packet_info) {
|
const RtpPacketSendInfo& packet_info) {
|
||||||
|
feedback_demuxer_.AddPacket(packet_info);
|
||||||
|
|
||||||
transport_feedback_adapter_.AddPacket(
|
transport_feedback_adapter_.AddPacket(
|
||||||
packet_info,
|
packet_info,
|
||||||
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
|
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
|
||||||
@ -478,6 +480,7 @@ void RtpTransportControllerSend::OnAddPacket(
|
|||||||
void RtpTransportControllerSend::OnTransportFeedback(
|
void RtpTransportControllerSend::OnTransportFeedback(
|
||||||
const rtcp::TransportFeedback& feedback) {
|
const rtcp::TransportFeedback& feedback) {
|
||||||
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
|
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
|
||||||
|
feedback_demuxer_.OnTransportFeedback(feedback);
|
||||||
|
|
||||||
absl::optional<TransportPacketsFeedback> feedback_msg =
|
absl::optional<TransportPacketsFeedback> feedback_msg =
|
||||||
transport_feedback_adapter_.ProcessTransportFeedback(
|
transport_feedback_adapter_.ProcessTransportFeedback(
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
#include "call/rtp_video_sender.h"
|
#include "call/rtp_video_sender.h"
|
||||||
#include "modules/congestion_controller/rtp/control_handler.h"
|
#include "modules/congestion_controller/rtp/control_handler.h"
|
||||||
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
|
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
|
||||||
|
#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
|
||||||
#include "modules/pacing/paced_sender.h"
|
#include "modules/pacing/paced_sender.h"
|
||||||
#include "modules/pacing/packet_router.h"
|
#include "modules/pacing/packet_router.h"
|
||||||
#include "modules/pacing/rtp_packet_pacer.h"
|
#include "modules/pacing/rtp_packet_pacer.h"
|
||||||
@ -149,6 +150,7 @@ class RtpTransportControllerSend final
|
|||||||
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
|
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
|
||||||
|
|
||||||
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
|
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
|
||||||
|
TransportFeedbackDemuxer feedback_demuxer_;
|
||||||
|
|
||||||
// TODO(srte): Move all access to feedback adapter to task queue.
|
// TODO(srte): Move all access to feedback adapter to task queue.
|
||||||
TransportFeedbackAdapter transport_feedback_adapter_;
|
TransportFeedbackAdapter transport_feedback_adapter_;
|
||||||
|
@ -45,6 +45,8 @@ rtc_library("transport_feedback") {
|
|||||||
sources = [
|
sources = [
|
||||||
"transport_feedback_adapter.cc",
|
"transport_feedback_adapter.cc",
|
||||||
"transport_feedback_adapter.h",
|
"transport_feedback_adapter.h",
|
||||||
|
"transport_feedback_demuxer.cc",
|
||||||
|
"transport_feedback_demuxer.h",
|
||||||
]
|
]
|
||||||
|
|
||||||
deps = [
|
deps = [
|
||||||
@ -69,6 +71,7 @@ if (rtc_include_tests) {
|
|||||||
|
|
||||||
sources = [
|
sources = [
|
||||||
"transport_feedback_adapter_unittest.cc",
|
"transport_feedback_adapter_unittest.cc",
|
||||||
|
"transport_feedback_demuxer_unittest.cc",
|
||||||
]
|
]
|
||||||
deps = [
|
deps = [
|
||||||
":transport_feedback",
|
":transport_feedback",
|
||||||
|
@ -66,30 +66,6 @@ DataSize InFlightBytesTracker::GetOutstandingData(
|
|||||||
|
|
||||||
TransportFeedbackAdapter::TransportFeedbackAdapter() = default;
|
TransportFeedbackAdapter::TransportFeedbackAdapter() = default;
|
||||||
|
|
||||||
TransportFeedbackAdapter::~TransportFeedbackAdapter() {
|
|
||||||
RTC_DCHECK(observers_.empty());
|
|
||||||
}
|
|
||||||
|
|
||||||
void TransportFeedbackAdapter::RegisterStreamFeedbackObserver(
|
|
||||||
std::vector<uint32_t> ssrcs,
|
|
||||||
StreamFeedbackObserver* observer) {
|
|
||||||
rtc::CritScope cs(&observers_lock_);
|
|
||||||
RTC_DCHECK(observer);
|
|
||||||
RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
|
|
||||||
return pair.second == observer;
|
|
||||||
}) == observers_.end());
|
|
||||||
observers_.push_back({ssrcs, observer});
|
|
||||||
}
|
|
||||||
|
|
||||||
void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver(
|
|
||||||
StreamFeedbackObserver* observer) {
|
|
||||||
rtc::CritScope cs(&observers_lock_);
|
|
||||||
RTC_DCHECK(observer);
|
|
||||||
const auto it = absl::c_find_if(
|
|
||||||
observers_, [=](const auto& pair) { return pair.second == observer; });
|
|
||||||
RTC_DCHECK(it != observers_.end());
|
|
||||||
observers_.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
|
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
|
||||||
size_t overhead_bytes,
|
size_t overhead_bytes,
|
||||||
@ -104,10 +80,6 @@ void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
|
|||||||
packet.local_net_id = local_net_id_;
|
packet.local_net_id = local_net_id_;
|
||||||
packet.remote_net_id = remote_net_id_;
|
packet.remote_net_id = remote_net_id_;
|
||||||
packet.sent.pacing_info = packet_info.pacing_info;
|
packet.sent.pacing_info = packet_info.pacing_info;
|
||||||
if (packet_info.has_rtp_sequence_number) {
|
|
||||||
packet.ssrc = packet_info.ssrc;
|
|
||||||
packet.rtp_sequence_number = packet_info.rtp_sequence_number;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!history_.empty() &&
|
while (!history_.empty() &&
|
||||||
creation_time - history_.begin()->second.creation_time >
|
creation_time - history_.begin()->second.creation_time >
|
||||||
@ -168,32 +140,25 @@ TransportFeedbackAdapter::ProcessTransportFeedback(
|
|||||||
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
|
RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
|
||||||
return absl::nullopt;
|
return absl::nullopt;
|
||||||
}
|
}
|
||||||
std::vector<PacketFeedback> feedback_vector;
|
|
||||||
|
rtc::CritScope cs(&lock_);
|
||||||
TransportPacketsFeedback msg;
|
TransportPacketsFeedback msg;
|
||||||
msg.feedback_time = feedback_receive_time;
|
msg.feedback_time = feedback_receive_time;
|
||||||
{
|
|
||||||
rtc::CritScope cs(&lock_);
|
|
||||||
msg.prior_in_flight =
|
|
||||||
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
|
||||||
feedback_vector =
|
|
||||||
ProcessTransportFeedbackInner(feedback, feedback_receive_time);
|
|
||||||
if (feedback_vector.empty())
|
|
||||||
return absl::nullopt;
|
|
||||||
|
|
||||||
for (const PacketFeedback& fb : feedback_vector) {
|
msg.prior_in_flight =
|
||||||
PacketResult res;
|
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
||||||
res.sent_packet = fb.sent;
|
msg.packet_feedbacks =
|
||||||
res.receive_time = fb.receive_time;
|
ProcessTransportFeedbackInner(feedback, feedback_receive_time);
|
||||||
msg.packet_feedbacks.push_back(res);
|
if (msg.packet_feedbacks.empty())
|
||||||
}
|
return absl::nullopt;
|
||||||
auto it = history_.find(last_ack_seq_num_);
|
|
||||||
if (it != history_.end()) {
|
auto it = history_.find(last_ack_seq_num_);
|
||||||
msg.first_unacked_send_time = it->second.sent.send_time;
|
if (it != history_.end()) {
|
||||||
}
|
msg.first_unacked_send_time = it->second.sent.send_time;
|
||||||
msg.data_in_flight =
|
|
||||||
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
|
||||||
}
|
}
|
||||||
SignalObservers(feedback_vector);
|
msg.data_in_flight =
|
||||||
|
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
||||||
|
|
||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -209,7 +174,7 @@ DataSize TransportFeedbackAdapter::GetOutstandingData() const {
|
|||||||
return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<PacketFeedback>
|
std::vector<PacketResult>
|
||||||
TransportFeedbackAdapter::ProcessTransportFeedbackInner(
|
TransportFeedbackAdapter::ProcessTransportFeedbackInner(
|
||||||
const rtcp::TransportFeedback& feedback,
|
const rtcp::TransportFeedback& feedback,
|
||||||
Timestamp feedback_time) {
|
Timestamp feedback_time) {
|
||||||
@ -232,8 +197,8 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
|
|||||||
}
|
}
|
||||||
last_timestamp_ = feedback.GetBaseTime();
|
last_timestamp_ = feedback.GetBaseTime();
|
||||||
|
|
||||||
std::vector<PacketFeedback> packet_feedback_vector;
|
std::vector<PacketResult> packet_result_vector;
|
||||||
packet_feedback_vector.reserve(feedback.GetPacketStatusCount());
|
packet_result_vector.reserve(feedback.GetPacketStatusCount());
|
||||||
|
|
||||||
size_t failed_lookups = 0;
|
size_t failed_lookups = 0;
|
||||||
size_t ignored = 0;
|
size_t ignored = 0;
|
||||||
@ -276,7 +241,10 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
|
|||||||
}
|
}
|
||||||
if (packet_feedback.local_net_id == local_net_id_ &&
|
if (packet_feedback.local_net_id == local_net_id_ &&
|
||||||
packet_feedback.remote_net_id == remote_net_id_) {
|
packet_feedback.remote_net_id == remote_net_id_) {
|
||||||
packet_feedback_vector.push_back(packet_feedback);
|
PacketResult result;
|
||||||
|
result.sent_packet = packet_feedback.sent;
|
||||||
|
result.receive_time = packet_feedback.receive_time;
|
||||||
|
packet_result_vector.push_back(result);
|
||||||
} else {
|
} else {
|
||||||
++ignored;
|
++ignored;
|
||||||
}
|
}
|
||||||
@ -292,27 +260,7 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
|
|||||||
<< " packets because they were sent on a different route.";
|
<< " packets because they were sent on a different route.";
|
||||||
}
|
}
|
||||||
|
|
||||||
return packet_feedback_vector;
|
return packet_result_vector;
|
||||||
}
|
|
||||||
|
|
||||||
void TransportFeedbackAdapter::SignalObservers(
|
|
||||||
const std::vector<PacketFeedback>& feedback_vector) {
|
|
||||||
rtc::CritScope cs(&observers_lock_);
|
|
||||||
for (auto& observer : observers_) {
|
|
||||||
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
|
|
||||||
for (const auto& packet : feedback_vector) {
|
|
||||||
if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) {
|
|
||||||
StreamFeedbackObserver::StreamPacketInfo packet_info;
|
|
||||||
packet_info.ssrc = *packet.ssrc;
|
|
||||||
packet_info.rtp_sequence_number = packet.rtp_sequence_number;
|
|
||||||
packet_info.received = packet.receive_time.IsFinite();
|
|
||||||
selected_feedback.push_back(packet_info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!selected_feedback.empty()) {
|
|
||||||
observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
@ -38,9 +38,6 @@ struct PacketFeedback {
|
|||||||
// The network route ids that this packet is associated with.
|
// The network route ids that this packet is associated with.
|
||||||
uint16_t local_net_id = 0;
|
uint16_t local_net_id = 0;
|
||||||
uint16_t remote_net_id = 0;
|
uint16_t remote_net_id = 0;
|
||||||
// The SSRC and RTP sequence number of the packet this feedback refers to.
|
|
||||||
absl::optional<uint32_t> ssrc;
|
|
||||||
uint16_t rtp_sequence_number = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class InFlightBytesTracker {
|
class InFlightBytesTracker {
|
||||||
@ -55,16 +52,9 @@ class InFlightBytesTracker {
|
|||||||
std::map<RemoteAndLocalNetworkId, DataSize> in_flight_data_;
|
std::map<RemoteAndLocalNetworkId, DataSize> in_flight_data_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class TransportFeedbackAdapter : public StreamFeedbackProvider {
|
class TransportFeedbackAdapter {
|
||||||
public:
|
public:
|
||||||
TransportFeedbackAdapter();
|
TransportFeedbackAdapter();
|
||||||
virtual ~TransportFeedbackAdapter();
|
|
||||||
|
|
||||||
void RegisterStreamFeedbackObserver(
|
|
||||||
std::vector<uint32_t> ssrcs,
|
|
||||||
StreamFeedbackObserver* observer) override;
|
|
||||||
void DeRegisterStreamFeedbackObserver(
|
|
||||||
StreamFeedbackObserver* observer) override;
|
|
||||||
|
|
||||||
void AddPacket(const RtpPacketSendInfo& packet_info,
|
void AddPacket(const RtpPacketSendInfo& packet_info,
|
||||||
size_t overhead_bytes,
|
size_t overhead_bytes,
|
||||||
@ -83,15 +73,10 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider {
|
|||||||
private:
|
private:
|
||||||
enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate };
|
enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate };
|
||||||
|
|
||||||
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
|
std::vector<PacketResult> ProcessTransportFeedbackInner(
|
||||||
|
|
||||||
std::vector<PacketFeedback> ProcessTransportFeedbackInner(
|
|
||||||
const rtcp::TransportFeedback& feedback,
|
const rtcp::TransportFeedback& feedback,
|
||||||
Timestamp feedback_time) RTC_RUN_ON(&lock_);
|
Timestamp feedback_time) RTC_RUN_ON(&lock_);
|
||||||
|
|
||||||
void SignalObservers(
|
|
||||||
const std::vector<PacketFeedback>& packet_feedback_vector);
|
|
||||||
|
|
||||||
rtc::CriticalSection lock_;
|
rtc::CriticalSection lock_;
|
||||||
DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero();
|
DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero();
|
||||||
Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity();
|
Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity();
|
||||||
@ -110,13 +95,6 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider {
|
|||||||
|
|
||||||
uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
||||||
uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0;
|
||||||
|
|
||||||
rtc::CriticalSection observers_lock_;
|
|
||||||
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than
|
|
||||||
// set/map to ensure that the processing order is consistent independently of
|
|
||||||
// the randomized ssrcs.
|
|
||||||
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
|
|
||||||
observers_ RTC_GUARDED_BY(&observers_lock_);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
@ -126,58 +126,6 @@ class TransportFeedbackAdapterTest : public ::testing::Test {
|
|||||||
std::unique_ptr<TransportFeedbackAdapter> adapter_;
|
std::unique_ptr<TransportFeedbackAdapter> adapter_;
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
|
|
||||||
MockStreamFeedbackObserver mock;
|
|
||||||
adapter_->RegisterStreamFeedbackObserver({kSsrc}, &mock);
|
|
||||||
|
|
||||||
const std::vector<PacketResult> packets = {
|
|
||||||
CreatePacket(100, 200, 0, 1000, kPacingInfo0),
|
|
||||||
CreatePacket(110, 210, 1, 2000, kPacingInfo0),
|
|
||||||
CreatePacket(120, 220, 2, 3000, kPacingInfo0)};
|
|
||||||
|
|
||||||
rtcp::TransportFeedback feedback;
|
|
||||||
feedback.SetBase(packets[0].sent_packet.sequence_number,
|
|
||||||
packets[0].receive_time.us());
|
|
||||||
|
|
||||||
for (const auto& packet : packets) {
|
|
||||||
OnSentPacket(packet);
|
|
||||||
EXPECT_TRUE(feedback.AddReceivedPacket(packet.sent_packet.sequence_number,
|
|
||||||
packet.receive_time.us()));
|
|
||||||
}
|
|
||||||
|
|
||||||
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1);
|
|
||||||
adapter_->ProcessTransportFeedback(feedback, clock_.CurrentTime());
|
|
||||||
|
|
||||||
adapter_->DeRegisterStreamFeedbackObserver(&mock);
|
|
||||||
|
|
||||||
auto new_packet = CreatePacket(130, 230, 3, 4000, kPacingInfo0);
|
|
||||||
OnSentPacket(new_packet);
|
|
||||||
|
|
||||||
rtcp::TransportFeedback second_feedback;
|
|
||||||
second_feedback.SetBase(new_packet.sent_packet.sequence_number,
|
|
||||||
new_packet.receive_time.us());
|
|
||||||
EXPECT_TRUE(second_feedback.AddReceivedPacket(
|
|
||||||
new_packet.sent_packet.sequence_number, new_packet.receive_time.us()));
|
|
||||||
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
|
|
||||||
adapter_->ProcessTransportFeedback(second_feedback, clock_.CurrentTime());
|
|
||||||
}
|
|
||||||
|
|
||||||
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
|
|
||||||
TEST_F(TransportFeedbackAdapterTest, ObserverDoubleRegistrationDeathTest) {
|
|
||||||
MockStreamFeedbackObserver mock;
|
|
||||||
adapter_->RegisterStreamFeedbackObserver({0}, &mock);
|
|
||||||
EXPECT_DEATH(adapter_->RegisterStreamFeedbackObserver({0}, &mock), "");
|
|
||||||
adapter_->DeRegisterStreamFeedbackObserver(&mock);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(TransportFeedbackAdapterTest, ObserverMissingDeRegistrationDeathTest) {
|
|
||||||
MockStreamFeedbackObserver mock;
|
|
||||||
adapter_->RegisterStreamFeedbackObserver({0}, &mock);
|
|
||||||
EXPECT_DEATH(adapter_.reset(), "");
|
|
||||||
adapter_->DeRegisterStreamFeedbackObserver(&mock);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
|
TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
|
||||||
std::vector<PacketResult> packets;
|
std::vector<PacketResult> packets;
|
||||||
packets.push_back(CreatePacket(100, 200, 0, 1500, kPacingInfo0));
|
packets.push_back(CreatePacket(100, 200, 0, 1500, kPacingInfo0));
|
||||||
|
@ -0,0 +1,88 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
|
||||||
|
#include "absl/algorithm/container.h"
|
||||||
|
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
static const size_t kMaxPacketsInHistory = 5000;
|
||||||
|
}
|
||||||
|
void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver(
|
||||||
|
std::vector<uint32_t> ssrcs,
|
||||||
|
StreamFeedbackObserver* observer) {
|
||||||
|
rtc::CritScope cs(&observers_lock_);
|
||||||
|
RTC_DCHECK(observer);
|
||||||
|
RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
|
||||||
|
return pair.second == observer;
|
||||||
|
}) == observers_.end());
|
||||||
|
observers_.push_back({ssrcs, observer});
|
||||||
|
}
|
||||||
|
|
||||||
|
void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
|
||||||
|
StreamFeedbackObserver* observer) {
|
||||||
|
rtc::CritScope cs(&observers_lock_);
|
||||||
|
RTC_DCHECK(observer);
|
||||||
|
const auto it = absl::c_find_if(
|
||||||
|
observers_, [=](const auto& pair) { return pair.second == observer; });
|
||||||
|
RTC_DCHECK(it != observers_.end());
|
||||||
|
observers_.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
|
||||||
|
rtc::CritScope cs(&lock_);
|
||||||
|
if (packet_info.has_rtp_sequence_number && packet_info.ssrc != 0) {
|
||||||
|
StreamFeedbackObserver::StreamPacketInfo info;
|
||||||
|
info.ssrc = packet_info.ssrc;
|
||||||
|
info.rtp_sequence_number = packet_info.rtp_sequence_number;
|
||||||
|
info.received = false;
|
||||||
|
history_.insert(
|
||||||
|
{seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number),
|
||||||
|
info});
|
||||||
|
}
|
||||||
|
while (history_.size() > kMaxPacketsInHistory) {
|
||||||
|
history_.erase(history_.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void TransportFeedbackDemuxer::OnTransportFeedback(
|
||||||
|
const rtcp::TransportFeedback& feedback) {
|
||||||
|
std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
|
||||||
|
{
|
||||||
|
rtc::CritScope cs(&lock_);
|
||||||
|
for (const auto& packet : feedback.GetAllPackets()) {
|
||||||
|
int64_t seq_num =
|
||||||
|
seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
|
||||||
|
auto it = history_.find(seq_num);
|
||||||
|
if (it != history_.end()) {
|
||||||
|
auto packet_info = it->second;
|
||||||
|
packet_info.received = packet.received();
|
||||||
|
stream_feedbacks.push_back(packet_info);
|
||||||
|
if (packet.received())
|
||||||
|
history_.erase(it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rtc::CritScope cs(&observers_lock_);
|
||||||
|
for (auto& observer : observers_) {
|
||||||
|
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
|
||||||
|
for (const auto& packet_info : stream_feedbacks) {
|
||||||
|
if (absl::c_count(observer.first, packet_info.ssrc) > 0) {
|
||||||
|
selected_feedback.push_back(packet_info);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!selected_feedback.empty()) {
|
||||||
|
observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace webrtc
|
@ -0,0 +1,49 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#ifndef MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
|
||||||
|
#define MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
|
||||||
|
|
||||||
|
#include <map>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "modules/include/module_common_types_public.h"
|
||||||
|
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||||
|
#include "rtc_base/critical_section.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
|
||||||
|
class TransportFeedbackDemuxer : public StreamFeedbackProvider {
|
||||||
|
public:
|
||||||
|
// Implements StreamFeedbackProvider interface
|
||||||
|
void RegisterStreamFeedbackObserver(
|
||||||
|
std::vector<uint32_t> ssrcs,
|
||||||
|
StreamFeedbackObserver* observer) override;
|
||||||
|
void DeRegisterStreamFeedbackObserver(
|
||||||
|
StreamFeedbackObserver* observer) override;
|
||||||
|
void AddPacket(const RtpPacketSendInfo& packet_info);
|
||||||
|
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
|
||||||
|
|
||||||
|
private:
|
||||||
|
rtc::CriticalSection lock_;
|
||||||
|
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
|
||||||
|
std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
|
||||||
|
RTC_GUARDED_BY(&lock_);
|
||||||
|
|
||||||
|
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than
|
||||||
|
// set/map to ensure that the processing order is consistent independently of
|
||||||
|
// the randomized ssrcs.
|
||||||
|
rtc::CriticalSection observers_lock_;
|
||||||
|
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
|
||||||
|
observers_ RTC_GUARDED_BY(&observers_lock_);
|
||||||
|
};
|
||||||
|
} // namespace webrtc
|
||||||
|
|
||||||
|
#endif // MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
|
@ -0,0 +1,67 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
|
||||||
|
|
||||||
|
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
|
||||||
|
#include "test/gmock.h"
|
||||||
|
#include "test/gtest.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
using ::testing::_;
|
||||||
|
static constexpr uint32_t kSsrc = 8492;
|
||||||
|
|
||||||
|
class MockStreamFeedbackObserver : public webrtc::StreamFeedbackObserver {
|
||||||
|
public:
|
||||||
|
MOCK_METHOD1(OnPacketFeedbackVector,
|
||||||
|
void(std::vector<StreamPacketInfo> packet_feedback_vector));
|
||||||
|
};
|
||||||
|
|
||||||
|
RtpPacketSendInfo CreatePacket(uint32_t ssrc,
|
||||||
|
int16_t rtp_sequence_number,
|
||||||
|
int64_t transport_sequence_number) {
|
||||||
|
RtpPacketSendInfo res;
|
||||||
|
res.ssrc = ssrc;
|
||||||
|
res.transport_sequence_number = transport_sequence_number;
|
||||||
|
res.rtp_sequence_number = rtp_sequence_number;
|
||||||
|
res.has_rtp_sequence_number = true;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
TEST(TransportFeedbackDemuxerTest, ObserverSanity) {
|
||||||
|
TransportFeedbackDemuxer demuxer;
|
||||||
|
MockStreamFeedbackObserver mock;
|
||||||
|
demuxer.RegisterStreamFeedbackObserver({kSsrc}, &mock);
|
||||||
|
|
||||||
|
demuxer.AddPacket(CreatePacket(kSsrc, 55, 1));
|
||||||
|
demuxer.AddPacket(CreatePacket(kSsrc, 56, 2));
|
||||||
|
demuxer.AddPacket(CreatePacket(kSsrc, 57, 3));
|
||||||
|
|
||||||
|
rtcp::TransportFeedback feedback;
|
||||||
|
feedback.SetBase(1, 1000);
|
||||||
|
ASSERT_TRUE(feedback.AddReceivedPacket(1, 1000));
|
||||||
|
ASSERT_TRUE(feedback.AddReceivedPacket(2, 2000));
|
||||||
|
ASSERT_TRUE(feedback.AddReceivedPacket(3, 3000));
|
||||||
|
|
||||||
|
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1);
|
||||||
|
demuxer.OnTransportFeedback(feedback);
|
||||||
|
|
||||||
|
demuxer.DeRegisterStreamFeedbackObserver(&mock);
|
||||||
|
|
||||||
|
demuxer.AddPacket(CreatePacket(kSsrc, 58, 4));
|
||||||
|
rtcp::TransportFeedback second_feedback;
|
||||||
|
second_feedback.SetBase(4, 4000);
|
||||||
|
ASSERT_TRUE(second_feedback.AddReceivedPacket(4, 4000));
|
||||||
|
|
||||||
|
EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
|
||||||
|
demuxer.OnTransportFeedback(second_feedback);
|
||||||
|
}
|
||||||
|
} // namespace webrtc
|
Reference in New Issue
Block a user