Always use dedicated queue for frame transformation

The current logic has risks if the encoder queue changes, for example when switching from a hardware encoder to a software encoder, or when switching from two different software implementations. Always using a dedicated task queue simplifies the code and is safer.

Bug: chromium:1395308
Change-Id: I0a576ed2f6e892955e0a519567969474d3b99efd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/285882
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38793}
This commit is contained in:
Evan Shrubsole
2022-12-02 10:03:44 +00:00
committed by WebRTC LUCI CQ
parent 9e099b62a1
commit 3fcd49e972
3 changed files with 8 additions and 112 deletions

View File

@ -18,7 +18,6 @@
#include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h" #include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h"
#include "modules/rtp_rtcp/source/rtp_sender_video.h" #include "modules/rtp_rtcp/source/rtp_sender_video.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h"
namespace webrtc { namespace webrtc {
namespace { namespace {
@ -104,33 +103,15 @@ RTPSenderVideoFrameTransformerDelegate::RTPSenderVideoFrameTransformerDelegate(
: sender_(sender), : sender_(sender),
frame_transformer_(std::move(frame_transformer)), frame_transformer_(std::move(frame_transformer)),
ssrc_(ssrc), ssrc_(ssrc),
task_queue_factory_(task_queue_factory) { transformation_queue_(task_queue_factory->CreateTaskQueue(
RTC_DCHECK(task_queue_factory_); "video_frame_transformer",
} TaskQueueFactory::Priority::NORMAL)) {}
void RTPSenderVideoFrameTransformerDelegate::Init() { void RTPSenderVideoFrameTransformerDelegate::Init() {
frame_transformer_->RegisterTransformedFrameSinkCallback( frame_transformer_->RegisterTransformedFrameSinkCallback(
rtc::scoped_refptr<TransformedFrameCallback>(this), ssrc_); rtc::scoped_refptr<TransformedFrameCallback>(this), ssrc_);
} }
void RTPSenderVideoFrameTransformerDelegate::EnsureEncoderQueueCreated() {
TaskQueueBase* current = TaskQueueBase::Current();
if (!encoder_queue_) {
// Save the current task queue to post the transformed frame for sending
// once it is transformed. When there is no current task queue, i.e.
// encoding is done on an external thread (for example in the case of
// hardware encoders), create a new task queue.
if (current) {
encoder_queue_ = current;
} else {
owned_encoder_queue_ = task_queue_factory_->CreateTaskQueue(
"video_frame_transformer", TaskQueueFactory::Priority::NORMAL);
encoder_queue_ = owned_encoder_queue_.get();
}
}
}
bool RTPSenderVideoFrameTransformerDelegate::TransformFrame( bool RTPSenderVideoFrameTransformerDelegate::TransformFrame(
int payload_type, int payload_type,
absl::optional<VideoCodecType> codec_type, absl::optional<VideoCodecType> codec_type,
@ -138,18 +119,6 @@ bool RTPSenderVideoFrameTransformerDelegate::TransformFrame(
const EncodedImage& encoded_image, const EncodedImage& encoded_image,
RTPVideoHeader video_header, RTPVideoHeader video_header,
absl::optional<int64_t> expected_retransmission_time_ms) { absl::optional<int64_t> expected_retransmission_time_ms) {
EnsureEncoderQueueCreated();
TaskQueueBase* current = TaskQueueBase::Current();
// DCHECK that the current queue does not change, or if does then it was due
// to a hardware encoder fallback and thus there is an owned queue.
RTC_DCHECK(!current || current == encoder_queue_ || owned_encoder_queue_)
<< "Current thread must either be an external thread (nullptr) or be the "
"same as the previous encoder queue. The current thread is "
<< (current ? "non-null" : "nullptr") << " and the encoder thread is "
<< (current == encoder_queue_ ? "the same queue."
: "not the same queue.");
frame_transformer_->Transform(std::make_unique<TransformableVideoSenderFrame>( frame_transformer_->Transform(std::make_unique<TransformableVideoSenderFrame>(
encoded_image, video_header, payload_type, codec_type, rtp_timestamp, encoded_image, video_header, payload_type, codec_type, rtp_timestamp,
expected_retransmission_time_ms, ssrc_)); expected_retransmission_time_ms, ssrc_));
@ -160,22 +129,20 @@ void RTPSenderVideoFrameTransformerDelegate::OnTransformedFrame(
std::unique_ptr<TransformableFrameInterface> frame) { std::unique_ptr<TransformableFrameInterface> frame) {
MutexLock lock(&sender_lock_); MutexLock lock(&sender_lock_);
EnsureEncoderQueueCreated();
if (!sender_) { if (!sender_) {
return; return;
} }
rtc::scoped_refptr<RTPSenderVideoFrameTransformerDelegate> delegate(this); rtc::scoped_refptr<RTPSenderVideoFrameTransformerDelegate> delegate(this);
encoder_queue_->PostTask( transformation_queue_->PostTask(
[delegate = std::move(delegate), frame = std::move(frame)]() mutable { [delegate = std::move(delegate), frame = std::move(frame)]() mutable {
RTC_DCHECK_RUN_ON(delegate->encoder_queue_); RTC_DCHECK_RUN_ON(delegate->transformation_queue_.get());
delegate->SendVideo(std::move(frame)); delegate->SendVideo(std::move(frame));
}); });
} }
void RTPSenderVideoFrameTransformerDelegate::SendVideo( void RTPSenderVideoFrameTransformerDelegate::SendVideo(
std::unique_ptr<TransformableFrameInterface> transformed_frame) const { std::unique_ptr<TransformableFrameInterface> transformed_frame) const {
RTC_DCHECK_RUN_ON(encoder_queue_); RTC_DCHECK_RUN_ON(transformation_queue_.get());
RTC_CHECK_EQ(transformed_frame->GetDirection(), RTC_CHECK_EQ(transformed_frame->GetDirection(),
TransformableFrameInterface::Direction::kSender); TransformableFrameInterface::Direction::kSender);
MutexLock lock(&sender_lock_); MutexLock lock(&sender_lock_);

View File

@ -53,7 +53,7 @@ class RTPSenderVideoFrameTransformerDelegate : public TransformedFrameCallback {
// Delegates the call to RTPSendVideo::SendVideo on the `encoder_queue_`. // Delegates the call to RTPSendVideo::SendVideo on the `encoder_queue_`.
void SendVideo(std::unique_ptr<TransformableFrameInterface> frame) const void SendVideo(std::unique_ptr<TransformableFrameInterface> frame) const
RTC_RUN_ON(encoder_queue_); RTC_RUN_ON(transformation_queue_);
// Delegates the call to RTPSendVideo::SetVideoStructureAfterTransformation // Delegates the call to RTPSendVideo::SetVideoStructureAfterTransformation
// under `sender_lock_`. // under `sender_lock_`.
@ -80,11 +80,9 @@ class RTPSenderVideoFrameTransformerDelegate : public TransformedFrameCallback {
RTPSenderVideo* sender_ RTC_GUARDED_BY(sender_lock_); RTPSenderVideo* sender_ RTC_GUARDED_BY(sender_lock_);
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_; rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_;
const uint32_t ssrc_; const uint32_t ssrc_;
TaskQueueBase* encoder_queue_ = nullptr;
TaskQueueFactory* task_queue_factory_;
// Used when the encoded frames arrives without a current task queue. This can // Used when the encoded frames arrives without a current task queue. This can
// happen if a hardware encoder was used. // happen if a hardware encoder was used.
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> owned_encoder_queue_; std::unique_ptr<TaskQueueBase, TaskQueueDeleter> transformation_queue_;
}; };
// Method to support cloning a Sender frame from another frame // Method to support cloning a Sender frame from another frame

View File

@ -1526,75 +1526,6 @@ TEST_F(RtpSenderVideoWithFrameTransformerTest, OnTransformedFrameSendsVideo) {
EXPECT_EQ(transport_.packets_sent(), 2); EXPECT_EQ(transport_.packets_sent(), 2);
} }
// Task queue which behaves as if it was a hardware encoder thread where no
// CurrentTaskQueue is set.
class HardwareEncoderTaskQueue : public TaskQueueBase {
public:
HardwareEncoderTaskQueue() = default;
void Delete() override {}
void PostTask(absl::AnyInvocable<void() &&> task) override {
CurrentTaskQueueSetter null_setter(nullptr);
std::move(task)();
}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
// Not implemented.
RTC_CHECK_NOTREACHED();
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
// Not implemented.
RTC_CHECK_NOTREACHED();
}
};
TEST_F(RtpSenderVideoWithFrameTransformerTest,
OnTransformedFrameSendsVideoOnNewQueueForHwEncoders) {
auto mock_frame_transformer =
rtc::make_ref_counted<NiceMock<MockFrameTransformer>>();
rtc::scoped_refptr<TransformedFrameCallback> callback;
EXPECT_CALL(*mock_frame_transformer, RegisterTransformedFrameSinkCallback)
.WillOnce(SaveArg<0>(&callback));
std::unique_ptr<RTPSenderVideo> rtp_sender_video =
CreateSenderWithFrameTransformer(mock_frame_transformer);
ASSERT_TRUE(callback);
auto encoded_image = CreateDefaultEncodedImage();
RTPVideoHeader video_header;
video_header.frame_type = VideoFrameType::kVideoFrameKey;
ON_CALL(*mock_frame_transformer, Transform)
.WillByDefault(
[&callback](std::unique_ptr<TransformableFrameInterface> frame) {
callback->OnTransformedFrame(std::move(frame));
});
// Hardware encoder task queue has no TaskQueue::Current() set, and so a new
// task queue should be created to handle the callback.
HardwareEncoderTaskQueue hw_encoder_queue;
hw_encoder_queue.PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
// No packets sent yet since a task should be posted onto a new task queue.
EXPECT_EQ(transport_.packets_sent(), 0);
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 1);
// Check software encoder fallback.
auto encoder_queue = time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"encoder_queue", TaskQueueFactory::Priority::NORMAL);
encoder_queue->PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 2);
}
TEST_F(RtpSenderVideoWithFrameTransformerTest, TEST_F(RtpSenderVideoWithFrameTransformerTest,
TransformableFrameMetadataHasCorrectValue) { TransformableFrameMetadataHasCorrectValue) {
auto mock_frame_transformer = auto mock_frame_transformer =