(Un/)Subscribe RtpVideoSender for feedback on the transport queue.

* RtpVideoSender now registers/unregisters for feedback callback
  inside of SetActive(), which runs on the transport queue.
* Transport feedback is given on the transport queue
* Registration/unregistration for feedback is done on the same
* Removed the last mutex from TransportFeedbackDemuxer.

Ultimately, this work is related to moving state from the Call
class, that's related to network configuration, but due to the code
is currently written is attached to the worker thread, over to the
Transport, where it's used (e.g. suspended_video_send_ssrcs_).

Bug: webrtc:13517, webrtc:11993
Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35777}
This commit is contained in:
Tommi
2022-01-24 17:56:16 +01:00
committed by WebRTC LUCI CQ
parent 27e8a095bf
commit 9d230d54c7
6 changed files with 75 additions and 51 deletions

View File

@ -572,10 +572,10 @@ void RtpTransportControllerSend::OnAddPacket(
void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
feedback_demuxer_.OnTransportFeedback(feedback);
auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
RTC_DCHECK_RUN_ON(&task_queue_);
feedback_demuxer_.OnTransportFeedback(feedback);
absl::optional<TransportPacketsFeedback> feedback_msg =
transport_feedback_adapter_.ProcessTransportFeedback(feedback,
feedback_time);

View File

@ -445,9 +445,6 @@ RtpVideoSender::RtpVideoSender(
fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled());
fec_controller_->SetProtectionCallback(this);
// Signal congestion controller this object is ready for OnPacket* callbacks.
transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
rtp_config_.ssrcs, this);
// Construction happens on the worker thread (see Call::CreateVideoSendStream)
// but subseqeuent calls to the RTP state will happen on one of two threads:
@ -466,8 +463,8 @@ RtpVideoSender::~RtpVideoSender() {
SetActiveModulesLocked(
std::vector<bool>(rtp_streams_.size(), /*active=*/false));
transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver(
this);
RTC_DCHECK(!registered_for_feedback_);
}
void RtpVideoSender::SetActive(bool active) {
@ -475,8 +472,18 @@ void RtpVideoSender::SetActive(bool active) {
MutexLock lock(&mutex_);
if (active_ == active)
return;
const std::vector<bool> active_modules(rtp_streams_.size(), active);
SetActiveModulesLocked(active_modules);
auto* feedback_provider = transport_->GetStreamFeedbackProvider();
if (active && !registered_for_feedback_) {
feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this);
registered_for_feedback_ = true;
} else if (!active && registered_for_feedback_) {
feedback_provider->DeRegisterStreamFeedbackObserver(this);
registered_for_feedback_ = false;
}
}
void RtpVideoSender::SetActiveModules(const std::vector<bool> active_modules) {

View File

@ -180,6 +180,7 @@ class RtpVideoSender : public RtpVideoSenderInterface,
// transport task queue.
mutable Mutex mutex_;
bool active_ RTC_GUARDED_BY(mutex_);
bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false;
const std::unique_ptr<FecController> fec_controller_;
bool fec_allowed_ RTC_GUARDED_BY(mutex_);

View File

@ -13,6 +13,7 @@
#include <atomic>
#include <memory>
#include <string>
#include <utility>
#include "call/rtp_transport_controller_send.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@ -177,10 +178,33 @@ class RtpVideoSenderTestFixture {
/*frame_count_observer=*/nullptr,
/*frame_transformer=*/nullptr) {}
~RtpVideoSenderTestFixture() { SetActive(false); }
RtpVideoSender* router() { return router_.get(); }
MockTransport& transport() { return transport_; }
void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
void SetActive(bool active) {
RunOnTransportQueue([&]() { router_->SetActive(active); });
}
void SetActiveModules(const std::vector<bool>& active_modules) {
RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); });
}
// Several RtpVideoSender methods expect to be called on the task queue as
// owned by the send transport. While the SequenceChecker may pick up the
// default thread as the transport queue, explicit checks for the transport
// queue (not just using a SequenceChecker) aren't possible unless such a
// queue is actually active. So RunOnTransportQueue is a convenience function
// that allow for running a closure on the transport queue, similar to
// SendTask().
template <typename Closure>
void RunOnTransportQueue(Closure&& task) {
transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
AdvanceTime(TimeDelta::Millis(0));
}
private:
NiceMock<MockTransport> transport_;
NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
@ -217,15 +241,15 @@ TEST(RtpVideoSenderTest, SendOnOneModule) {
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
test.router()->SetActive(true);
test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
test.router()->SetActive(false);
test.SetActive(false);
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
test.router()->SetActive(true);
test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image, nullptr).error);
}
@ -244,7 +268,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) {
CodecSpecificInfo codec_info;
codec_info.codecType = kVideoCodecVP8;
test.router()->SetActive(true);
test.SetActive(true);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
@ -254,7 +278,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActive) {
test.router()->OnEncodedImage(encoded_image_2, &codec_info).error);
// Inactive.
test.router()->SetActive(false);
test.SetActive(false);
EXPECT_NE(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
EXPECT_NE(EncodedImageCallback::Result::OK,
@ -284,14 +308,14 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
// Only setting one stream to active will still set the payload router to
// active and allow sending data on the active stream.
std::vector<bool> active_modules({true, false});
test.router()->SetActiveModules(active_modules);
test.SetActiveModules(active_modules);
EXPECT_EQ(EncodedImageCallback::Result::OK,
test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
// Setting both streams to inactive will turn the payload router to
// inactive.
active_modules = {false, false};
test.router()->SetActiveModules(active_modules);
test.SetActiveModules(active_modules);
// An incoming encoded image will not ask the module to send outgoing data
// because the payload router is inactive.
EXPECT_NE(EncodedImageCallback::Result::OK,
@ -303,7 +327,7 @@ TEST(RtpVideoSenderTest, SendSimulcastSetActiveModules) {
TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
test.router()->GetRtpPayloadStates();
@ -328,7 +352,7 @@ TEST(RtpVideoSenderTest, CreateWithPreviousStates) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, states);
test.router()->SetActive(true);
test.SetActive(true);
std::map<uint32_t, RtpPayloadState> initial_states =
test.router()->GetRtpPayloadStates();
@ -368,7 +392,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
test.router()->OnEncodedImage(encoded_image, nullptr).error);
::testing::Mock::VerifyAndClearExpectations(&callback);
test.router()->SetActive(true);
test.SetActive(true);
FrameCounts frame_counts;
EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1))
@ -397,7 +421,7 @@ TEST(RtpVideoSenderTest, FrameCountCallbacks) {
TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
constexpr uint8_t kPayload = 'a';
EncodedImage encoded_image;
@ -496,8 +520,8 @@ TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
}
// This tests that we utilize transport wide feedback to retransmit lost
// packets. This is tested by dropping all ordirary packets from a "lossy"
// stream send along with an secondary untouched stream. The transport wide
// packets. This is tested by dropping all ordinary packets from a "lossy"
// stream sent along with a secondary untouched stream. The transport wide
// feedback packets from the secondary stream allows the sending side to
// detect and retreansmit the lost packets from the lossy stream.
TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
@ -562,7 +586,7 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
TEST(RtpVideoSenderTest, EarlyRetransmits) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
const uint8_t kPayload[1] = {'a'};
EncodedImage encoded_image;
@ -657,7 +681,7 @@ TEST(RtpVideoSenderTest, EarlyRetransmits) {
TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -717,7 +741,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -773,7 +797,7 @@ TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
TEST(RtpVideoSenderTest,
SupportsDependencyDescriptorForVp9NotProvidedByEncoder) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -828,7 +852,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) {
test::ScopedFieldTrials field_trials(
"WebRTC-GenericCodecDependencyDescriptor/Enabled/");
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -874,7 +898,7 @@ TEST(RtpVideoSenderTest, GenerateDependecyDescriptorForGenericCodecs) {
TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -932,7 +956,7 @@ TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
TEST(RtpVideoSenderTest,
SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) {
RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
test.router()->SetActive(true);
test.SetActive(true);
RtpHeaderExtensionMap extensions;
extensions.Register<RtpDependencyDescriptorExtension>(
@ -1017,7 +1041,7 @@ TEST(RtpVideoSenderTest, OverheadIsSubtractedFromTargetBitrate) {
kRtpHeaderSizeBytes + kTransportPacketOverheadBytes;
RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes);
test.router()->SetActive(true);
test.SetActive(true);
{
test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000),

View File

@ -44,11 +44,7 @@ void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
}
void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
// Currently called on the send transport queue.
// TODO(tommi): When registration/unregistration as well as
// OnTransportFeedback callbacks occur on the transport queue, we can remove
// this lock.
MutexLock lock(&lock_);
RTC_DCHECK_RUN_ON(&observer_checker_);
StreamFeedbackObserver::StreamPacketInfo info;
info.ssrc = packet_info.media_ssrc;
@ -66,24 +62,22 @@ void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
void TransportFeedbackDemuxer::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUN_ON(&observer_checker_);
std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
{
MutexLock lock(&lock_);
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num =
seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
auto it = history_.find(seq_num);
if (it != history_.end()) {
auto packet_info = it->second;
packet_info.received = packet.received();
stream_feedbacks.push_back(std::move(packet_info));
if (packet.received())
history_.erase(it);
}
for (const auto& packet : feedback.GetAllPackets()) {
int64_t seq_num =
seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
auto it = history_.find(seq_num);
if (it != history_.end()) {
auto packet_info = it->second;
packet_info.received = packet.received();
stream_feedbacks.push_back(std::move(packet_info));
if (packet.received())
history_.erase(it);
}
}
RTC_DCHECK_RUN_ON(&observer_checker_);
for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
for (const auto& packet_info : stream_feedbacks) {

View File

@ -17,7 +17,6 @@
#include "api/sequence_checker.h"
#include "modules/include/module_common_types_public.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/system/no_unique_address.h"
namespace webrtc {
@ -45,15 +44,14 @@ class TransportFeedbackDemuxer final : public StreamFeedbackProvider {
void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
private:
Mutex lock_;
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_);
std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
RTC_GUARDED_BY(&lock_);
RTC_GUARDED_BY(&observer_checker_);
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than
// set/map to ensure that the processing order is consistent independently of
// the randomized ssrcs.
RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
observers_ RTC_GUARDED_BY(&observer_checker_);
};