From 31d1bcef232206b82fd637c7452ffeccd6e55a91 Mon Sep 17 00:00:00 2001 From: Tommi Date: Tue, 27 Aug 2019 11:34:20 +0200 Subject: [PATCH] Fix deadlock in VideoSendStream tests, cause of flake on some bots. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: webrtc:10861, webrtc:10880 Change-Id: Ic3ff9fab420e1fd634f58ef86d2f8890e23cfd03 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150220 Commit-Queue: Tommi Reviewed-by: Yves Gerey Reviewed-by: Sami Kalliomäki Cr-Commit-Position: refs/heads/master@{#28969} --- test/call_test.cc | 29 ++++- test/single_threaded_task_queue.cc | 46 ++++++-- test/single_threaded_task_queue.h | 8 ++ video/BUILD.gn | 1 + video/video_send_stream_tests.cc | 163 ++++++++++++++++------------- 5 files changed, 160 insertions(+), 87 deletions(-) diff --git a/test/call_test.cc b/test/call_test.cc index f7f85f6e05..9100ec4514 100644 --- a/test/call_test.cc +++ b/test/call_test.cc @@ -57,11 +57,17 @@ CallTest::CallTest() task_queue_("CallTestTaskQueue") {} CallTest::~CallTest() { - task_queue_.SendTask([this]() { - fake_send_audio_device_ = nullptr; - fake_recv_audio_device_ = nullptr; - video_sources_.clear(); - }); + // In most cases the task_queue_ should have been stopped by now, assuming + // the regular path of using CallTest to call PerformTest (followed by + // cleanup). However, there are some tests that don't use the class that way + // hence we need this special handling for cleaning up. + if (task_queue_.IsRunning()) { + task_queue_.SendTask([this]() { + fake_send_audio_device_ = nullptr; + fake_recv_audio_device_ = nullptr; + video_sources_.clear(); + }); + } } void CallTest::RegisterRtpExtension(const RtpExtension& extension) { @@ -194,10 +200,23 @@ void CallTest::RunBaseTest(BaseTest* test) { DestroyStreams(); send_transport_.reset(); receive_transport_.reset(); + frame_generator_capturer_ = nullptr; video_sources_.clear(); DestroyCalls(); + + fake_send_audio_device_ = nullptr; + fake_recv_audio_device_ = nullptr; }); + + // To avoid a race condition during destruction, which can happen while + // a derived class is being destructed but pending tasks might still run + // because the |task_queue_| is still in scope, we stop the TQ here. + // Note that tests should not be posting more tasks during teardown but + // as is, that's hard to control with the current test harness. E.g. transport + // classes continue to issue callbacks (e.g. OnSendRtp) during teardown, which + // can have a ripple effect. + task_queue_.Stop(); } void CallTest::CreateCalls() { diff --git a/test/single_threaded_task_queue.cc b/test/single_threaded_task_queue.cc index 3cba11748c..80db9f422c 100644 --- a/test/single_threaded_task_queue.cc +++ b/test/single_threaded_task_queue.cc @@ -37,13 +37,7 @@ SingleThreadedTaskQueueForTesting::SingleThreadedTaskQueueForTesting( } SingleThreadedTaskQueueForTesting::~SingleThreadedTaskQueueForTesting() { - RTC_DCHECK_RUN_ON(&owner_thread_checker_); - { - rtc::CritScope lock(&cs_); - running_ = false; - } - wake_up_.Set(); - thread_.Stop(); + Stop(); } SingleThreadedTaskQueueForTesting::TaskId @@ -57,6 +51,8 @@ SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task, int64_t earliest_exec_time = rtc::TimeAfter(delay_ms); rtc::CritScope lock(&cs_); + if (!running_) + return kInvalidTaskId; TaskId id = next_task_id_++; @@ -82,10 +78,12 @@ SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task, void SingleThreadedTaskQueueForTesting::SendTask(Task task) { RTC_DCHECK(!IsCurrent()); rtc::Event done; - PostTask([&task, &done]() { - task(); - done.Set(); - }); + if (PostTask([&task, &done]() { + task(); + done.Set(); + }) == kInvalidTaskId) { + return; + } // Give up after 30 seconds, warn after 10. RTC_CHECK(done.Wait(30000, 10000)); } @@ -105,6 +103,32 @@ bool SingleThreadedTaskQueueForTesting::IsCurrent() { return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef()); } +bool SingleThreadedTaskQueueForTesting::IsRunning() { + RTC_DCHECK_RUN_ON(&owner_thread_checker_); + // We could check the |running_| flag here, but this is equivalent for the + // purposes of this function. + return thread_.IsRunning(); +} + +bool SingleThreadedTaskQueueForTesting::HasPendingTasks() const { + rtc::CritScope lock(&cs_); + return !tasks_.empty(); +} + +void SingleThreadedTaskQueueForTesting::Stop() { + RTC_DCHECK_RUN_ON(&owner_thread_checker_); + if (!thread_.IsRunning()) + return; + + { + rtc::CritScope lock(&cs_); + running_ = false; + } + + wake_up_.Set(); + thread_.Stop(); +} + void SingleThreadedTaskQueueForTesting::Run(void* obj) { static_cast(obj)->RunLoop(); } diff --git a/test/single_threaded_task_queue.h b/test/single_threaded_task_queue.h index efd14ce604..93eed6680c 100644 --- a/test/single_threaded_task_queue.h +++ b/test/single_threaded_task_queue.h @@ -32,6 +32,7 @@ class SingleThreadedTaskQueueForTesting { public: using Task = std::function; using TaskId = size_t; + constexpr static TaskId kInvalidTaskId = static_cast(-1); explicit SingleThreadedTaskQueueForTesting(const char* name); ~SingleThreadedTaskQueueForTesting(); @@ -59,6 +60,13 @@ class SingleThreadedTaskQueueForTesting { // Returns true iff called on the thread associated with the task queue. bool IsCurrent(); + // Returns true iff the task queue is actively being serviced. + bool IsRunning(); + + bool HasPendingTasks() const; + + void Stop(); + private: struct QueuedTask { QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task); diff --git a/video/BUILD.gn b/video/BUILD.gn index 634e40eda8..2e4cde294d 100644 --- a/video/BUILD.gn +++ b/video/BUILD.gn @@ -623,6 +623,7 @@ if (rtc_include_tests) { "../rtc_base:rtc_task_queue", "../rtc_base:task_queue_for_test", "../rtc_base/experiments:alr_experiment", + "../rtc_base/synchronization:sequence_checker", "../rtc_base/task_utils:to_queued_task", "../system_wrappers", "../system_wrappers:field_trial", diff --git a/video/video_send_stream_tests.cc b/video/video_send_stream_tests.cc index 8c6ff627b3..a1a91b4e5b 100644 --- a/video/video_send_stream_tests.cc +++ b/video/video_send_stream_tests.cc @@ -37,6 +37,7 @@ #include "rtc_base/logging.h" #include "rtc_base/platform_thread.h" #include "rtc_base/rate_limiter.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/time_utils.h" #include "rtc_base/unique_id_generator.h" #include "system_wrappers/include/sleep.h" @@ -1677,11 +1678,15 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { : EndToEndTest(test::CallTest::kDefaultTimeoutMs), task_queue_(task_queue), call_(nullptr) { + module_process_thread_.Detach(); + task_queue_thread_.Detach(); EXPECT_TRUE(parser_->RegisterRtpHeaderExtension( kRtpExtensionTransportSequenceNumber, kExtensionId)); } void OnCallsCreated(Call* sender_call, Call* receiver_call) override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + RTC_DCHECK(!call_); call_ = sender_call; } @@ -1689,6 +1694,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { VideoSendStream::Config* send_config, std::vector* receive_configs, VideoEncoderConfig* encoder_config) override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); send_config->rtp.extensions.clear(); send_config->rtp.extensions.push_back(RtpExtension( RtpExtension::kTransportSequenceNumberUri, kExtensionId)); @@ -1699,6 +1705,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { void ModifyAudioConfigs( AudioSendStream::Config* send_config, std::vector* receive_configs) override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); send_config->rtp.extensions.clear(); send_config->rtp.extensions.push_back(RtpExtension( RtpExtension::kTransportSequenceNumberUri, kExtensionId)); @@ -1708,15 +1715,23 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { } Action OnSendRtp(const uint8_t* packet, size_t length) override { - Call::Stats stats; - task_queue_->SendTask([this, &stats]() { stats = call_->GetStats(); }); - if (stats.send_bandwidth_bps > kStartBitrateBps) { - observation_complete_.Set(); - } - + RTC_DCHECK_RUN_ON(&module_process_thread_); + task_queue_->PostTask([this]() { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + if (!call_) + return; + Call::Stats stats = call_->GetStats(); + if (stats.send_bandwidth_bps > kStartBitrateBps) + observation_complete_.Set(); + }); return SEND_PACKET; } + void OnStreamsStopped() override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + call_ = nullptr; + } + void PerformTest() override { rtc::NetworkRoute new_route; new_route.connected = true; @@ -1725,6 +1740,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { BitrateConstraints bitrate_config; task_queue_->SendTask([this, &new_route, &bitrate_config]() { + RTC_DCHECK_RUN_ON(&task_queue_thread_); call_->GetTransportControllerSend()->OnNetworkRouteChanged("transport", new_route); bitrate_config.start_bitrate_bps = kStartBitrateBps; @@ -1736,6 +1752,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { << "Timed out while waiting for start bitrate to be exceeded."; task_queue_->SendTask([this, &new_route, &bitrate_config]() { + RTC_DCHECK_RUN_ON(&task_queue_thread_); bitrate_config.start_bitrate_bps = -1; bitrate_config.max_bitrate_bps = kNewMaxBitrateBps; call_->GetTransportControllerSend()->SetSdpBitrateParameters( @@ -1750,8 +1767,10 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) { } private: + webrtc::SequenceChecker module_process_thread_; + webrtc::SequenceChecker task_queue_thread_; test::SingleThreadedTaskQueueForTesting* const task_queue_; - Call* call_; + Call* call_ RTC_GUARDED_BY(task_queue_thread_); } test(&task_queue_); RunBaseTest(&test); @@ -1839,29 +1858,21 @@ class MaxPaddingSetTest : public test::SendTest { T* stream_reset_fun, test::SingleThreadedTaskQueueForTesting* task_queue) : SendTest(test::CallTest::kDefaultTimeoutMs), - call_(nullptr), - send_stream_(nullptr), - send_stream_config_(nullptr), - packets_sent_(0), running_without_padding_(test_switch_content_type), stream_resetter_(stream_reset_fun), task_queue_(task_queue) { RTC_DCHECK(stream_resetter_); - } - - void OnVideoStreamsCreated( - VideoSendStream* send_stream, - const std::vector& receive_streams) override { - rtc::CritScope lock(&crit_); - send_stream_ = send_stream; + module_process_thread_.Detach(); + task_queue_thread_.Detach(); } void ModifyVideoConfigs( VideoSendStream::Config* send_config, std::vector* receive_configs, VideoEncoderConfig* encoder_config) override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); RTC_DCHECK_EQ(1, encoder_config->number_of_streams); - if (RunningWithoutPadding()) { + if (running_without_padding_) { encoder_config->min_transmit_bitrate_bps = 0; encoder_config->content_type = VideoEncoderConfig::ContentType::kRealtimeVideo; @@ -1874,70 +1885,81 @@ class MaxPaddingSetTest : public test::SendTest { } void OnCallsCreated(Call* sender_call, Call* receiver_call) override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + RTC_DCHECK(task_queue_->IsCurrent()); + RTC_DCHECK(!call_); + RTC_DCHECK(sender_call); call_ = sender_call; } // Called on the pacer thread. Action OnSendRtp(const uint8_t* packet, size_t length) override { - // GetStats() needs to be called from the construction thread of call_. - Call::Stats stats; - task_queue_->SendTask([this, &stats]() { stats = call_->GetStats(); }); + RTC_DCHECK_RUN_ON(&module_process_thread_); - rtc::CritScope lock(&crit_); + // Check the stats on the correct thread and signal the 'complete' flag + // once we detect that we're done. - if (running_without_padding_) - EXPECT_EQ(0, stats.max_padding_bitrate_bps); + task_queue_->PostTask([this]() { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + // In case we get a callback during teardown. + // When this happens, OnStreamsStopped() has been called already, + // |call_| is null and the streams are being torn down. + if (!call_) + return; - // Wait until at least kMinPacketsToSend frames have been encoded, so that - // we have reliable data. - if (++packets_sent_ < kMinPacketsToSend) - return SEND_PACKET; + ++packets_sent_; - if (running_without_padding_) { - // We've sent kMinPacketsToSend packets with default configuration, switch - // to enabling screen content and setting min transmit bitrate. - // Note that we need to recreate the stream if changing content type. - packets_sent_ = 0; - encoder_config_.min_transmit_bitrate_bps = kMinTransmitBitrateBps; - encoder_config_.content_type = VideoEncoderConfig::ContentType::kScreen; - running_without_padding_ = false; - content_switch_event_.Set(); - return SEND_PACKET; - } + Call::Stats stats = call_->GetStats(); + if (running_without_padding_) { + EXPECT_EQ(0, stats.max_padding_bitrate_bps); - // Make sure the pacer has been configured with a min transmit bitrate. - if (stats.max_padding_bitrate_bps > 0) - observation_complete_.Set(); + // Wait until at least kMinPacketsToSend frames have been encoded, so + // that we have reliable data. + if (packets_sent_ < kMinPacketsToSend) + return; + + // We've sent kMinPacketsToSend packets with default configuration, + // switch to enabling screen content and setting min transmit bitrate. + // Note that we need to recreate the stream if changing content type. + packets_sent_ = 0; + + encoder_config_.min_transmit_bitrate_bps = kMinTransmitBitrateBps; + encoder_config_.content_type = VideoEncoderConfig::ContentType::kScreen; + + running_without_padding_ = false; + (*stream_resetter_)(send_stream_config_, encoder_config_); + } else { + // Make sure the pacer has been configured with a min transmit bitrate. + if (stats.max_padding_bitrate_bps > 0) { + observation_complete_.Set(); + } + } + }); return SEND_PACKET; } - void PerformTest() override { - if (RunningWithoutPadding()) { - ASSERT_TRUE( - content_switch_event_.Wait(test::CallTest::kDefaultTimeoutMs)); - (*stream_resetter_)(send_stream_config_, encoder_config_); - } + // Called on |task_queue_| + void OnStreamsStopped() override { + RTC_DCHECK_RUN_ON(&task_queue_thread_); + RTC_DCHECK(task_queue_->IsCurrent()); + call_ = nullptr; + } + void PerformTest() override { ASSERT_TRUE(Wait()) << "Timed out waiting for a valid padding bitrate."; } private: - bool RunningWithoutPadding() const { - rtc::CritScope lock(&crit_); - return running_without_padding_; - } - - rtc::CriticalSection crit_; - rtc::Event content_switch_event_; - Call* call_; - VideoSendStream* send_stream_ RTC_GUARDED_BY(crit_); - VideoSendStream::Config send_stream_config_; + webrtc::SequenceChecker task_queue_thread_; + Call* call_ RTC_GUARDED_BY(task_queue_thread_) = nullptr; + VideoSendStream::Config send_stream_config_{nullptr}; VideoEncoderConfig encoder_config_; - uint32_t packets_sent_ RTC_GUARDED_BY(crit_); - bool running_without_padding_; + webrtc::SequenceChecker module_process_thread_; + uint32_t packets_sent_ RTC_GUARDED_BY(task_queue_thread_) = 0; + bool running_without_padding_ RTC_GUARDED_BY(task_queue_thread_); T* const stream_resetter_; - test::SingleThreadedTaskQueueForTesting* task_queue_; + test::SingleThreadedTaskQueueForTesting* const task_queue_; }; TEST_F(VideoSendStreamTest, RespectsMinTransmitBitrate) { @@ -1951,15 +1973,14 @@ TEST_F(VideoSendStreamTest, RespectsMinTransmitBitrateAfterContentSwitch) { // Function for removing and recreating the send stream with a new config. auto reset_fun = [this](const VideoSendStream::Config& send_stream_config, const VideoEncoderConfig& encoder_config) { - task_queue_.SendTask([this, &send_stream_config, &encoder_config]() { - Stop(); - DestroyVideoSendStreams(); - SetVideoSendConfig(send_stream_config); - SetVideoEncoderConfig(encoder_config); - CreateVideoSendStreams(); - SetVideoDegradation(DegradationPreference::MAINTAIN_RESOLUTION); - Start(); - }); + RTC_DCHECK(task_queue_.IsCurrent()); + Stop(); + DestroyVideoSendStreams(); + SetVideoSendConfig(send_stream_config); + SetVideoEncoderConfig(encoder_config); + CreateVideoSendStreams(); + SetVideoDegradation(DegradationPreference::MAINTAIN_RESOLUTION); + Start(); }; MaxPaddingSetTest test(true, &reset_fun, &task_queue_); RunBaseTest(&test);