Have RTPSenderVideoFrameTransformerDelegate use new TQ for HW encoders

Instead of re-using the sender task queue, a new task queue will
suffice.

Bug: webrtc:14445
Change-Id: Ia7395ace2f0bb66bf9e76e3783b208f2cd0385dc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275771
Commit-Queue: Evan Shrubsole <eshr@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38332}
This commit is contained in:
Evan Shrubsole
2022-10-07 14:22:44 +00:00
committed by WebRTC LUCI CQ
parent 88b8581a46
commit 9b643d4a49
12 changed files with 157 additions and 40 deletions

View File

@ -184,6 +184,7 @@ rtc_library("rtp_sender") {
"../api:sequence_checker",
"../api:transport_api",
"../api/rtc_event_log",
"../api/task_queue",
"../api/transport:field_trial_based_config",
"../api/transport:goog_cc",
"../api/transport:network_control",

View File

@ -91,6 +91,7 @@ RtpTransportControllerSend::RtpTransportControllerSend(
const FieldTrialsView& trials)
: clock_(clock),
event_log_(event_log),
task_queue_factory_(task_queue_factory),
bitrate_configurator_(bitrate_config),
pacer_started_(false),
pacer_settings_(trials),
@ -157,7 +158,7 @@ RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
this, event_log, &retransmission_rate_limiter_, std::move(fec_controller),
frame_encryption_config.frame_encryptor,
frame_encryption_config.crypto_options, std::move(frame_transformer),
field_trials_));
field_trials_, task_queue_factory_));
return video_rtp_senders_.back().get();
}

View File

@ -20,6 +20,7 @@
#include "absl/strings/string_view.h"
#include "api/network_state_predictor.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/transport/network_control.h"
#include "api/units/data_rate.h"
#include "call/rtp_bitrate_configurator.h"
@ -154,6 +155,7 @@ class RtpTransportControllerSend final
Clock* const clock_;
RtcEventLog* const event_log_;
TaskQueueFactory* const task_queue_factory_;
SequenceChecker main_thread_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_

View File

@ -19,6 +19,7 @@
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/transport/field_trial_based_config.h"
#include "api/video_codecs/video_codec.h"
#include "call/rtp_transport_controller_send_interface.h"
@ -198,8 +199,10 @@ std::vector<RtpStreamSender> CreateRtpStreamSenders(
FrameEncryptorInterface* frame_encryptor,
const CryptoOptions& crypto_options,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
const FieldTrialsView& trials) {
const FieldTrialsView& trials,
TaskQueueFactory* task_queue_factory) {
RTC_DCHECK_GT(rtp_config.ssrcs.size(), 0);
RTC_DCHECK(task_queue_factory);
RtpRtcpInterface::Configuration configuration;
configuration.clock = clock;
@ -290,7 +293,7 @@ std::vector<RtpStreamSender> CreateRtpStreamSenders(
video_config.fec_overhead_bytes = fec_generator->MaxPacketOverhead();
}
video_config.frame_transformer = frame_transformer;
video_config.send_transport_queue = transport->GetWorkerQueue()->Get();
video_config.task_queue_factory = task_queue_factory;
auto sender_video = std::make_unique<RTPSenderVideo>(video_config);
rtp_streams.emplace_back(std::move(rtp_rtcp), std::move(sender_video),
std::move(fec_generator));
@ -368,7 +371,8 @@ RtpVideoSender::RtpVideoSender(
FrameEncryptorInterface* frame_encryptor,
const CryptoOptions& crypto_options,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
const FieldTrialsView& field_trials)
const FieldTrialsView& field_trials,
TaskQueueFactory* task_queue_factory)
: field_trials_(field_trials),
send_side_bwe_with_overhead_(!absl::StartsWith(
field_trials_.Lookup("WebRTC-SendSideBwe-WithOverhead"),
@ -393,7 +397,8 @@ RtpVideoSender::RtpVideoSender(
frame_encryptor,
crypto_options,
std::move(frame_transformer),
field_trials_)),
field_trials_,
task_queue_factory)),
rtp_config_(rtp_config),
codec_type_(GetVideoCodecType(rtp_config)),
transport_(transport),

View File

@ -24,6 +24,8 @@
#include "api/field_trials_view.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/video_codecs/video_encoder.h"
#include "call/rtp_config.h"
#include "call/rtp_payload_params.h"
@ -86,7 +88,8 @@ class RtpVideoSender : public RtpVideoSenderInterface,
FrameEncryptorInterface* frame_encryptor,
const CryptoOptions& crypto_options, // move inside RtpTransport
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
const FieldTrialsView& field_trials);
const FieldTrialsView& field_trials,
TaskQueueFactory* task_queue_factory);
~RtpVideoSender() override;
RtpVideoSender(const RtpVideoSender&) = delete;

View File

@ -153,7 +153,8 @@ class RtpVideoSenderTestFixture {
&transport_controller_, &event_log_, &retransmission_rate_limiter_,
std::make_unique<FecControllerDefault>(time_controller_.GetClock()),
nullptr, CryptoOptions{}, frame_transformer,
field_trials ? *field_trials : field_trials_);
field_trials ? *field_trials : field_trials_,
time_controller_.GetTaskQueueFactory());
}
RtpVideoSenderTestFixture(

View File

@ -611,6 +611,7 @@ if (rtc_include_tests) {
"../../api:time_controller",
"../../api:transport_api",
"../../api/rtc_event_log",
"../../api/task_queue",
"../../api/transport:field_trial_based_config",
"../../api/transport/rtp:dependency_descriptor",
"../../api/units:data_rate",

View File

@ -171,7 +171,7 @@ RTPSenderVideo::RTPSenderVideo(const Config& config)
this,
config.frame_transformer,
rtp_sender_->SSRC(),
config.send_transport_queue)
config.task_queue_factory)
: nullptr),
include_capture_clock_offset_(!absl::StartsWith(
config.field_trials->Lookup(kIncludeCaptureClockOffset),

View File

@ -22,6 +22,7 @@
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/transport/rtp/dependency_descriptor.h"
#include "api/video/video_codec_type.h"
#include "api/video/video_frame_type.h"
@ -81,7 +82,7 @@ class RTPSenderVideo {
absl::optional<int> red_payload_type;
const FieldTrialsView* field_trials = nullptr;
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer;
TaskQueueBase* send_transport_queue = nullptr;
TaskQueueFactory* task_queue_factory = nullptr;
};
explicit RTPSenderVideo(const Config& config);

View File

@ -13,9 +13,11 @@
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_factory.h"
#include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h"
#include "modules/rtp_rtcp/source/rtp_sender_video.h"
#include "rtc_base/checks.h"
namespace webrtc {
namespace {
@ -97,11 +99,13 @@ RTPSenderVideoFrameTransformerDelegate::RTPSenderVideoFrameTransformerDelegate(
RTPSenderVideo* sender,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
uint32_t ssrc,
TaskQueueBase* send_transport_queue)
TaskQueueFactory* task_queue_factory)
: sender_(sender),
frame_transformer_(std::move(frame_transformer)),
ssrc_(ssrc),
send_transport_queue_(send_transport_queue) {}
task_queue_factory_(task_queue_factory) {
RTC_DCHECK(task_queue_factory_);
}
void RTPSenderVideoFrameTransformerDelegate::Init() {
frame_transformer_->RegisterTransformedFrameSinkCallback(
@ -115,14 +119,29 @@ bool RTPSenderVideoFrameTransformerDelegate::TransformFrame(
const EncodedImage& encoded_image,
RTPVideoHeader video_header,
absl::optional<int64_t> expected_retransmission_time_ms) {
TaskQueueBase* current = TaskQueueBase::Current();
if (!encoder_queue_) {
// Save the current task queue to post the transformed frame for sending
// once it is transformed. When there is no current task queue, i.e.
// encoding is done on an external thread (for example in the case of
// hardware encoders), use the send transport queue instead.
TaskQueueBase* current = TaskQueueBase::Current();
encoder_queue_ = current ? current : send_transport_queue_;
// hardware encoders), create a new task queue.
if (current) {
encoder_queue_ = current;
} else {
owned_encoder_queue_ = task_queue_factory_->CreateTaskQueue(
"video_frame_transformer", TaskQueueFactory::Priority::NORMAL);
encoder_queue_ = owned_encoder_queue_.get();
}
}
// DCHECK that the current queue does not change, or if does then it was due
// to a hardware encoder fallback and thus there is an owned queue.
RTC_DCHECK(!current || current == encoder_queue_ || owned_encoder_queue_)
<< "Current thread must either be an external thread (nullptr) or be the "
"same as the previous encoder queue. The current thread is "
<< (current ? "non-null" : "nullptr") << " and the encoder thread is "
<< (current == encoder_queue_ ? "the same queue."
: "not the same queue.");
frame_transformer_->Transform(std::make_unique<TransformableVideoSenderFrame>(
encoded_image, video_header, payload_type, codec_type, rtp_timestamp,
expected_retransmission_time_ms, ssrc_));
@ -141,13 +160,14 @@ void RTPSenderVideoFrameTransformerDelegate::OnTransformedFrame(
rtc::scoped_refptr<RTPSenderVideoFrameTransformerDelegate> delegate(this);
encoder_queue_->PostTask(
[delegate = std::move(delegate), frame = std::move(frame)]() mutable {
RTC_DCHECK_RUN_ON(delegate->encoder_queue_);
delegate->SendVideo(std::move(frame));
});
}
void RTPSenderVideoFrameTransformerDelegate::SendVideo(
std::unique_ptr<TransformableFrameInterface> transformed_frame) const {
RTC_CHECK(encoder_queue_->IsCurrent());
RTC_DCHECK_RUN_ON(encoder_queue_);
RTC_CHECK_EQ(transformed_frame->GetDirection(),
TransformableFrameInterface::Direction::kSender);
MutexLock lock(&sender_lock_);
@ -160,8 +180,7 @@ void RTPSenderVideoFrameTransformerDelegate::SendVideo(
transformed_video_frame->GetCodecType(),
transformed_video_frame->GetTimestamp(),
transformed_video_frame->GetCaptureTimeMs(),
transformed_video_frame->GetData(),
transformed_video_frame->GetHeader(),
transformed_video_frame->GetData(), transformed_video_frame->GetHeader(),
transformed_video_frame->GetExpectedRetransmissionTimeMs());
}

View File

@ -15,7 +15,9 @@
#include "api/frame_transformer_interface.h"
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/video/video_layers_allocation.h"
#include "rtc_base/synchronization/mutex.h"
@ -32,7 +34,7 @@ class RTPSenderVideoFrameTransformerDelegate : public TransformedFrameCallback {
RTPSenderVideo* sender,
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer,
uint32_t ssrc,
TaskQueueBase* send_transport_queue);
TaskQueueFactory* send_transport_queue);
void Init();
@ -50,7 +52,8 @@ class RTPSenderVideoFrameTransformerDelegate : public TransformedFrameCallback {
std::unique_ptr<TransformableFrameInterface> frame) override;
// Delegates the call to RTPSendVideo::SendVideo on the `encoder_queue_`.
void SendVideo(std::unique_ptr<TransformableFrameInterface> frame) const;
void SendVideo(std::unique_ptr<TransformableFrameInterface> frame) const
RTC_RUN_ON(encoder_queue_);
// Delegates the call to RTPSendVideo::SetVideoStructureAfterTransformation
// under `sender_lock_`.
@ -76,7 +79,10 @@ class RTPSenderVideoFrameTransformerDelegate : public TransformedFrameCallback {
rtc::scoped_refptr<FrameTransformerInterface> frame_transformer_;
const uint32_t ssrc_;
TaskQueueBase* encoder_queue_ = nullptr;
TaskQueueBase* send_transport_queue_;
TaskQueueFactory* task_queue_factory_;
// Used when the encoded frames arrives without a current task queue. This can
// happen if a hardware encoder was used.
std::unique_ptr<TaskQueueBase, TaskQueueDeleter> owned_encoder_queue_;
};
} // namespace webrtc

View File

@ -17,17 +17,17 @@
#include "absl/memory/memory.h"
#include "api/rtp_headers.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/test/mock_frame_encryptor.h"
#include "api/transport/field_trial_based_config.h"
#include "api/transport/rtp/dependency_descriptor.h"
#include "api/units/timestamp.h"
#include "api/video/video_codec_constants.h"
#include "api/video/video_timing.h"
#include "common_video/generic_frame_descriptor/generic_frame_info.h"
#include "modules/rtp_rtcp/include/rtp_cvo.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_dependency_descriptor_extension.h"
#include "modules/rtp_rtcp/source/rtp_descriptor_authentication.h"
#include "modules/rtp_rtcp/source/rtp_format_video_generic.h"
#include "modules/rtp_rtcp/source/rtp_generic_frame_descriptor.h"
#include "modules/rtp_rtcp/source/rtp_generic_frame_descriptor_extension.h"
@ -36,12 +36,13 @@
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
#include "modules/rtp_rtcp/source/rtp_video_layers_allocation_extension.h"
#include "rtc_base/arraysize.h"
#include "rtc_base/checks.h"
#include "rtc_base/rate_limiter.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/mock_frame_transformer.h"
#include "test/time_controller/simulated_time_controller.h"
namespace webrtc {
@ -78,7 +79,7 @@ constexpr uint32_t kTimestamp = 10;
constexpr uint16_t kSeqNum = 33;
constexpr uint32_t kSsrc = 725242;
constexpr int kMaxPacketLength = 1500;
constexpr uint64_t kStartTime = 123456789;
constexpr Timestamp kStartTime = Timestamp::Millis(123456789);
constexpr int64_t kDefaultExpectedRetransmissionTimeMs = 125;
class LoopbackTransportTest : public webrtc::Transport {
@ -1392,11 +1393,11 @@ INSTANTIATE_TEST_SUITE_P(WithAndWithoutOverhead,
class RtpSenderVideoWithFrameTransformerTest : public ::testing::Test {
public:
RtpSenderVideoWithFrameTransformerTest()
: fake_clock_(kStartTime),
retransmission_rate_limiter_(&fake_clock_, 1000),
: time_controller_(kStartTime),
retransmission_rate_limiter_(time_controller_.GetClock(), 1000),
rtp_module_(ModuleRtpRtcpImpl2::Create([&] {
RtpRtcpInterface::Configuration config;
config.clock = &fake_clock_;
config.clock = time_controller_.GetClock();
config.outgoing_transport = &transport_;
config.retransmission_rate_limiter = &retransmission_rate_limiter_;
config.field_trials = &field_trials_;
@ -1410,17 +1411,17 @@ class RtpSenderVideoWithFrameTransformerTest : public ::testing::Test {
std::unique_ptr<RTPSenderVideo> CreateSenderWithFrameTransformer(
rtc::scoped_refptr<FrameTransformerInterface> transformer) {
RTPSenderVideo::Config config;
config.clock = &fake_clock_;
config.clock = time_controller_.GetClock();
config.rtp_sender = rtp_module_->RtpSender();
config.field_trials = &field_trials_;
config.frame_transformer = transformer;
config.task_queue_factory = time_controller_.GetTaskQueueFactory();
return std::make_unique<RTPSenderVideo>(config);
}
protected:
rtc::AutoThread main_thread_;
GlobalSimulatedTimeController time_controller_;
FieldTrialBasedConfig field_trials_;
SimulatedClock fake_clock_;
LoopbackTransportTest transport_;
RateLimiter retransmission_rate_limiter_;
std::unique_ptr<ModuleRtpRtcpImpl2> rtp_module_;
@ -1514,15 +1515,91 @@ TEST_F(RtpSenderVideoWithFrameTransformerTest, OnTransformedFrameSendsVideo) {
[&callback](std::unique_ptr<TransformableFrameInterface> frame) {
callback->OnTransformedFrame(std::move(frame));
});
TaskQueueForTest encoder_queue;
encoder_queue.SendTask(
[&] {
rtp_sender_video->SendEncodedImage(
kPayload, kType, kTimestamp, *encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
encoder_queue.WaitForPreviouslyPostedTasks();
auto encoder_queue = time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"encoder_queue", TaskQueueFactory::Priority::NORMAL);
encoder_queue->PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 1);
encoder_queue->PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 2);
}
// Task queue which behaves as if it was a hardware encoder thread where no
// CurrentTaskQueue is set.
class HardwareEncoderTaskQueue : public TaskQueueBase {
public:
HardwareEncoderTaskQueue() = default;
void Delete() override {}
void PostTask(absl::AnyInvocable<void() &&> task) override {
CurrentTaskQueueSetter null_setter(nullptr);
std::move(task)();
}
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
// Not implemented.
RTC_CHECK_NOTREACHED();
}
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
TimeDelta delay) override {
// Not implemented.
RTC_CHECK_NOTREACHED();
}
};
TEST_F(RtpSenderVideoWithFrameTransformerTest,
OnTransformedFrameSendsVideoOnNewQueueForHwEncoders) {
auto mock_frame_transformer =
rtc::make_ref_counted<NiceMock<MockFrameTransformer>>();
rtc::scoped_refptr<TransformedFrameCallback> callback;
EXPECT_CALL(*mock_frame_transformer, RegisterTransformedFrameSinkCallback)
.WillOnce(SaveArg<0>(&callback));
std::unique_ptr<RTPSenderVideo> rtp_sender_video =
CreateSenderWithFrameTransformer(mock_frame_transformer);
ASSERT_TRUE(callback);
auto encoded_image = CreateDefaultEncodedImage();
RTPVideoHeader video_header;
video_header.frame_type = VideoFrameType::kVideoFrameKey;
ON_CALL(*mock_frame_transformer, Transform)
.WillByDefault(
[&callback](std::unique_ptr<TransformableFrameInterface> frame) {
callback->OnTransformedFrame(std::move(frame));
});
// Hardware encoder task queue has no TaskQueue::Current() set, and so a new
// task queue should be created to handle the callback.
HardwareEncoderTaskQueue hw_encoder_queue;
hw_encoder_queue.PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
// No packets sent yet since a task should be posted onto a new task queue.
EXPECT_EQ(transport_.packets_sent(), 0);
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 1);
// Check software encoder fallback.
auto encoder_queue = time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"encoder_queue", TaskQueueFactory::Priority::NORMAL);
encoder_queue->PostTask([&] {
rtp_sender_video->SendEncodedImage(kPayload, kType, kTimestamp,
*encoded_image, video_header,
kDefaultExpectedRetransmissionTimeMs);
});
time_controller_.AdvanceTime(TimeDelta::Zero());
EXPECT_EQ(transport_.packets_sent(), 2);
}
TEST_F(RtpSenderVideoWithFrameTransformerTest,