Reland "(Un/)Subscribe RtpVideoSender for feedback on the transport queue."
This is a reland of 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa Original change's description: > (Un/)Subscribe RtpVideoSender for feedback on the transport queue. > > * RtpVideoSender now registers/unregisters for feedback callback > inside of SetActive(), which runs on the transport queue. > * Transport feedback is given on the transport queue > * Registration/unregistration for feedback is done on the same > * Removed the last mutex from TransportFeedbackDemuxer. > > Ultimately, this work is related to moving state from the Call > class, that's related to network configuration, but due to the code > is currently written is attached to the worker thread, over to the > Transport, where it's used (e.g. suspended_video_send_ssrcs_). > > Bug: webrtc:13517, webrtc:11993 > Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165 > Reviewed-by: Erik Språng <sprang@webrtc.org> > Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> > Cr-Commit-Position: refs/heads/main@{#35777} Bug: webrtc:13517, webrtc:11993 Change-Id: I766e569abea8bae96d32267a951fcdc195ced8a7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249782 Reviewed-by: Stefan Holmer <stefan@webrtc.org> Reviewed-by: Erik Språng <sprang@webrtc.org> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#35863}
This commit is contained in:
@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket(
|
||||
|
||||
void RtpTransportControllerSend::OnTransportFeedback(
|
||||
const rtcp::TransportFeedback& feedback) {
|
||||
feedback_demuxer_.OnTransportFeedback(feedback);
|
||||
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
|
||||
task_queue_.PostTask([this, feedback, feedback_time]() {
|
||||
RTC_DCHECK_RUN_ON(&task_queue_);
|
||||
feedback_demuxer_.OnTransportFeedback(feedback);
|
||||
absl::optional<TransportPacketsFeedback> feedback_msg =
|
||||
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
|
||||
feedback_time);
|
||||
|
@ -445,9 +445,6 @@ RtpVideoSender::RtpVideoSender(
|
||||
fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled());
|
||||
|
||||
fec_controller_->SetProtectionCallback(this);
|
||||
// Signal congestion controller this object is ready for OnPacket* callbacks.
|
||||
transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
|
||||
rtp_config_.ssrcs, this);
|
||||
|
||||
// Construction happens on the worker thread (see Call::CreateVideoSendStream)
|
||||
// but subseqeuent calls to the RTP state will happen on one of two threads:
|
||||
@ -466,8 +463,8 @@ RtpVideoSender::~RtpVideoSender() {
|
||||
|
||||
SetActiveModulesLocked(
|
||||
std::vector<bool>(rtp_streams_.size(), /*active=*/false));
|
||||
transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver(
|
||||
this);
|
||||
|
||||
RTC_DCHECK(!registered_for_feedback_);
|
||||
}
|
||||
|
||||
void RtpVideoSender::SetActive(bool active) {
|
||||
@ -475,8 +472,18 @@ void RtpVideoSender::SetActive(bool active) {
|
||||
MutexLock lock(&mutex_);
|
||||
if (active_ == active)
|
||||
return;
|
||||
|
||||
const std::vector<bool> active_modules(rtp_streams_.size(), active);
|
||||
SetActiveModulesLocked(active_modules);
|
||||
|
||||
auto* feedback_provider = transport_->GetStreamFeedbackProvider();
|
||||
if (active && !registered_for_feedback_) {
|
||||
feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this);
|
||||
registered_for_feedback_ = true;
|
||||
} else if (!active && registered_for_feedback_) {
|
||||
feedback_provider->DeRegisterStreamFeedbackObserver(this);
|
||||
registered_for_feedback_ = false;
|
||||
}
|
||||
}
|
||||
|
||||
void RtpVideoSender::SetActiveModules(const std::vector<bool> active_modules) {
|
||||
|
@ -180,6 +180,7 @@ class RtpVideoSender : public RtpVideoSenderInterface,
|
||||
// transport task queue.
|
||||
mutable Mutex mutex_;
|
||||
bool active_ RTC_GUARDED_BY(mutex_);
|
||||
bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false;
|
||||
|
||||
const std::unique_ptr<FecController> fec_controller_;
|
||||
bool fec_allowed_ RTC_GUARDED_BY(mutex_);
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "call/rtp_transport_controller_send.h"
|
||||
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||
@ -177,10 +178,33 @@ class RtpVideoSenderTestFixture {
|
||||
/*frame_count_observer=*/nullptr,
|
||||
/*frame_transformer=*/nullptr) {}
|
||||
|
||||
~RtpVideoSenderTestFixture() { SetActive(false); }
|
||||
|
||||
RtpVideoSender* router() { return router_.get(); }
|
||||
MockTransport& transport() { return transport_; }
|
||||
void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
|
||||
|
||||
void SetActive(bool active) {
|
||||
RunOnTransportQueue([&]() { router_->SetActive(active); });
|
||||
}
|
||||
|
||||
void SetActiveModules(const std::vector<bool>& active_modules) {
|
||||
RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); });
|
||||
}
|
||||
|
||||
// Several RtpVideoSender methods expect to be called on the task queue as
|
||||
// owned by the send transport. While the SequenceChecker may pick up the
|
||||
// default thread as the transport queue, explicit checks for the transport
|
||||
// queue (not just using a SequenceChecker) aren't possible unless such a
|
||||
// queue is actually active. So RunOnTransportQueue is a convenience function
|
||||
// that allow for running a closure on the transport queue, similar to
|
||||
// SendTask().
|
||||
template <typename Closure>
|
||||
void RunOnTransportQueue(Closure&& task) {
|
||||
transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
|
||||
AdvanceTime(TimeDelta::Millis(0));
|
||||
}
|
||||
|
||||
private:
|
||||
NiceMock<MockTransport> transport_;
|
||||
NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
|
||||
@ -217,15 +241,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) {
|
||||
EXPECT_NE(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image, nullptr).error);
|
||||
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
EXPECT_EQ(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image, nullptr).error);
|
||||
|
||||
test.router()->SetActive(false);
|
||||
test.SetActive(false);
|
||||
EXPECT_NE(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image, nullptr).error);
|
||||
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
EXPECT_EQ(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image, nullptr).error);
|
||||
}
|
||||
@ -244,7 +268,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) {
|
||||
CodecSpecificInfo codec_info;
|
||||
codec_info.codecType = kVideoCodecVP8;
|
||||
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
EXPECT_EQ(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
|
||||
|
||||
@ -254,7 +278,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) {
|
||||
test.router()->OnEncodedImage(encoded_image_2, &codec_info).error);
|
||||
|
||||
// Inactive.
|
||||
test.router()->SetActive(false);
|
||||
test.SetActive(false);
|
||||
EXPECT_NE(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
|
||||
EXPECT_NE(EncodedImageCallback::Result::OK,
|
||||
@ -284,14 +308,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
|
||||
// Only setting one stream to active will still set the payload router to
|
||||
// active and allow sending data on the active stream.
|
||||
std::vector<bool> active_modules({true, false});
|
||||
test.router()->SetActiveModules(active_modules);
|
||||
test.SetActiveModules(active_modules);
|
||||
EXPECT_EQ(EncodedImageCallback::Result::OK,
|
||||
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
|
||||
|
||||
// Setting both streams to inactive will turn the payload router to
|
||||
// inactive.
|
||||
active_modules = {false, false};
|
||||
test.router()->SetActiveModules(active_modules);
|
||||
test.SetActiveModules(active_modules);
|
||||
// An incoming encoded image will not ask the module to send outgoing data
|
||||
// because the payload router is inactive.
|
||||
EXPECT_NE(EncodedImageCallback::Result::OK,
|
||||
@ -303,7 +327,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
|
||||
TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
|
||||
kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
std::map<uint32_t, RtpPayloadState> initial_states =
|
||||
test.router()->GetRtpPayloadStates();
|
||||
@ -328,7 +352,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) {
|
||||
|
||||
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
|
||||
kPayloadType, states);
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
std::map<uint32_t, RtpPayloadState> initial_states =
|
||||
test.router()->GetRtpPayloadStates();
|
||||
@ -368,7 +392,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
|
||||
test.router()->OnEncodedImage(encoded_image, nullptr).error);
|
||||
::testing::Mock::VerifyAndClearExpectations(&callback);
|
||||
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
FrameCounts frame_counts;
|
||||
EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1))
|
||||
@ -397,7 +421,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
|
||||
TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
|
||||
kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
constexpr uint8_t kPayload = 'a';
|
||||
EncodedImage encoded_image;
|
||||
@ -496,8 +520,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
|
||||
}
|
||||
|
||||
// This tests that we utilize transport wide feedback to retransmit lost
|
||||
// packets. This is tested by dropping all ordirary packets from a "lossy"
|
||||
// stream send along with an secondary untouched stream. The transport wide
|
||||
// packets. This is tested by dropping all ordinary packets from a "lossy"
|
||||
// stream sent along with a secondary untouched stream. The transport wide
|
||||
// feedback packets from the secondary stream allows the sending side to
|
||||
// detect and retreansmit the lost packets from the lossy stream.
|
||||
TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
|
||||
@ -562,7 +586,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
|
||||
TEST(RtpVideoSenderTest, EarlyRetransmits) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
|
||||
kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
const uint8_t kPayload[1] = {'a'};
|
||||
EncodedImage encoded_image;
|
||||
@ -657,7 +681,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) {
|
||||
|
||||
TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -717,7 +741,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
|
||||
|
||||
TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -773,7 +797,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
|
||||
TEST(RtpVideoSenderTest,
|
||||
SupportsDependencyDescriptorForVp9NotProvidedByEncoder) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -828,7 +852,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) {
|
||||
test::ScopedFieldTrials field_trials(
|
||||
"WebRTC-GenericCodecDependencyDescriptor/Enabled/");
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -874,7 +898,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) {
|
||||
|
||||
TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -932,7 +956,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
|
||||
TEST(RtpVideoSenderTest,
|
||||
SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) {
|
||||
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
RtpHeaderExtensionMap extensions;
|
||||
extensions.Register<RtpDependencyDescriptorExtension>(
|
||||
@ -1017,7 +1041,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) {
|
||||
kRtpHeaderSizeBytes + kTransportPacketOverheadBytes;
|
||||
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
|
||||
test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes);
|
||||
test.router()->SetActive(true);
|
||||
test.SetActive(true);
|
||||
|
||||
{
|
||||
test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000),
|
||||
|
@ -44,11 +44,7 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
|
||||
}
|
||||
|
||||
void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
|
||||
// Currently called on the send transport queue.
|
||||
// TODO(tommi): When registration/unregistration as well as
|
||||
// OnTransportFeedback callbacks occur on the transport queue, we can remove
|
||||
// this lock.
|
||||
MutexLock lock(&lock_);
|
||||
RTC_DCHECK_RUN_ON(&observer_checker_);
|
||||
|
||||
StreamFeedbackObserver::StreamPacketInfo info;
|
||||
info.ssrc = packet_info.media_ssrc;
|
||||
@ -66,9 +62,9 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
|
||||
|
||||
void TransportFeedbackDemuxer::OnTransportFeedback(
|
||||
const rtcp::TransportFeedback& feedback) {
|
||||
RTC_DCHECK_RUN_ON(&observer_checker_);
|
||||
|
||||
std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
|
||||
{
|
||||
MutexLock lock(&lock_);
|
||||
for (const auto& packet : feedback.GetAllPackets()) {
|
||||
int64_t seq_num =
|
||||
seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
|
||||
@ -81,9 +77,7 @@ void TransportFeedbackDemuxer::OnTransportFeedback(
|
||||
history_.erase(it);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RTC_DCHECK_RUN_ON(&observer_checker_);
|
||||
for (auto& observer : observers_) {
|
||||
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
|
||||
for (const auto& packet_info : stream_feedbacks) {
|
||||
|
@ -17,7 +17,6 @@
|
||||
#include "api/sequence_checker.h"
|
||||
#include "modules/include/module_common_types_public.h"
|
||||
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
|
||||
#include "rtc_base/synchronization/mutex.h"
|
||||
#include "rtc_base/system/no_unique_address.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -45,15 +44,14 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider {
|
||||
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
|
||||
|
||||
private:
|
||||
Mutex lock_;
|
||||
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
|
||||
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_);
|
||||
std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
|
||||
RTC_GUARDED_BY(&lock_);
|
||||
RTC_GUARDED_BY(&observer_checker_);
|
||||
|
||||
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than
|
||||
// set/map to ensure that the processing order is consistent independently of
|
||||
// the randomized ssrcs.
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
|
||||
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
|
||||
observers_ RTC_GUARDED_BY(&observer_checker_);
|
||||
};
|
||||
|
Reference in New Issue
Block a user