Fix deadlock in VideoSendStream tests, cause of flake on some bots.

Bug: webrtc:10861, webrtc:10880
Change-Id: Ic3ff9fab420e1fd634f58ef86d2f8890e23cfd03
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150220
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Yves Gerey <yvesg@google.com>
Reviewed-by: Sami Kalliomäki <sakal@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28969}
This commit is contained in:
Tommi
2019-08-27 11:34:20 +02:00
committed by Commit Bot
parent 0c141c591a
commit 31d1bcef23
5 changed files with 160 additions and 87 deletions

View File

@ -57,11 +57,17 @@ CallTest::CallTest()
task_queue_("CallTestTaskQueue") {}
CallTest::~CallTest() {
task_queue_.SendTask([this]() {
fake_send_audio_device_ = nullptr;
fake_recv_audio_device_ = nullptr;
video_sources_.clear();
});
// In most cases the task_queue_ should have been stopped by now, assuming
// the regular path of using CallTest to call PerformTest (followed by
// cleanup). However, there are some tests that don't use the class that way
// hence we need this special handling for cleaning up.
if (task_queue_.IsRunning()) {
task_queue_.SendTask([this]() {
fake_send_audio_device_ = nullptr;
fake_recv_audio_device_ = nullptr;
video_sources_.clear();
});
}
}
void CallTest::RegisterRtpExtension(const RtpExtension& extension) {
@ -194,10 +200,23 @@ void CallTest::RunBaseTest(BaseTest* test) {
DestroyStreams();
send_transport_.reset();
receive_transport_.reset();
frame_generator_capturer_ = nullptr;
video_sources_.clear();
DestroyCalls();
fake_send_audio_device_ = nullptr;
fake_recv_audio_device_ = nullptr;
});
// To avoid a race condition during destruction, which can happen while
// a derived class is being destructed but pending tasks might still run
// because the |task_queue_| is still in scope, we stop the TQ here.
// Note that tests should not be posting more tasks during teardown but
// as is, that's hard to control with the current test harness. E.g. transport
// classes continue to issue callbacks (e.g. OnSendRtp) during teardown, which
// can have a ripple effect.
task_queue_.Stop();
}
void CallTest::CreateCalls() {

View File

@ -37,13 +37,7 @@ SingleThreadedTaskQueueForTesting::SingleThreadedTaskQueueForTesting(
}
SingleThreadedTaskQueueForTesting::~SingleThreadedTaskQueueForTesting() {
RTC_DCHECK_RUN_ON(&owner_thread_checker_);
{
rtc::CritScope lock(&cs_);
running_ = false;
}
wake_up_.Set();
thread_.Stop();
Stop();
}
SingleThreadedTaskQueueForTesting::TaskId
@ -57,6 +51,8 @@ SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task,
int64_t earliest_exec_time = rtc::TimeAfter(delay_ms);
rtc::CritScope lock(&cs_);
if (!running_)
return kInvalidTaskId;
TaskId id = next_task_id_++;
@ -82,10 +78,12 @@ SingleThreadedTaskQueueForTesting::PostDelayedTask(Task task,
void SingleThreadedTaskQueueForTesting::SendTask(Task task) {
RTC_DCHECK(!IsCurrent());
rtc::Event done;
PostTask([&task, &done]() {
task();
done.Set();
});
if (PostTask([&task, &done]() {
task();
done.Set();
}) == kInvalidTaskId) {
return;
}
// Give up after 30 seconds, warn after 10.
RTC_CHECK(done.Wait(30000, 10000));
}
@ -105,6 +103,32 @@ bool SingleThreadedTaskQueueForTesting::IsCurrent() {
return rtc::IsThreadRefEqual(thread_.GetThreadRef(), rtc::CurrentThreadRef());
}
bool SingleThreadedTaskQueueForTesting::IsRunning() {
RTC_DCHECK_RUN_ON(&owner_thread_checker_);
// We could check the |running_| flag here, but this is equivalent for the
// purposes of this function.
return thread_.IsRunning();
}
bool SingleThreadedTaskQueueForTesting::HasPendingTasks() const {
rtc::CritScope lock(&cs_);
return !tasks_.empty();
}
void SingleThreadedTaskQueueForTesting::Stop() {
RTC_DCHECK_RUN_ON(&owner_thread_checker_);
if (!thread_.IsRunning())
return;
{
rtc::CritScope lock(&cs_);
running_ = false;
}
wake_up_.Set();
thread_.Stop();
}
void SingleThreadedTaskQueueForTesting::Run(void* obj) {
static_cast<SingleThreadedTaskQueueForTesting*>(obj)->RunLoop();
}

View File

@ -32,6 +32,7 @@ class SingleThreadedTaskQueueForTesting {
public:
using Task = std::function<void()>;
using TaskId = size_t;
constexpr static TaskId kInvalidTaskId = static_cast<TaskId>(-1);
explicit SingleThreadedTaskQueueForTesting(const char* name);
~SingleThreadedTaskQueueForTesting();
@ -59,6 +60,13 @@ class SingleThreadedTaskQueueForTesting {
// Returns true iff called on the thread associated with the task queue.
bool IsCurrent();
// Returns true iff the task queue is actively being serviced.
bool IsRunning();
bool HasPendingTasks() const;
void Stop();
private:
struct QueuedTask {
QueuedTask(TaskId task_id, int64_t earliest_execution_time, Task task);

View File

@ -623,6 +623,7 @@ if (rtc_include_tests) {
"../rtc_base:rtc_task_queue",
"../rtc_base:task_queue_for_test",
"../rtc_base/experiments:alr_experiment",
"../rtc_base/synchronization:sequence_checker",
"../rtc_base/task_utils:to_queued_task",
"../system_wrappers",
"../system_wrappers:field_trial",

View File

@ -37,6 +37,7 @@
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/rate_limiter.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/time_utils.h"
#include "rtc_base/unique_id_generator.h"
#include "system_wrappers/include/sleep.h"
@ -1677,11 +1678,15 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
: EndToEndTest(test::CallTest::kDefaultTimeoutMs),
task_queue_(task_queue),
call_(nullptr) {
module_process_thread_.Detach();
task_queue_thread_.Detach();
EXPECT_TRUE(parser_->RegisterRtpHeaderExtension(
kRtpExtensionTransportSequenceNumber, kExtensionId));
}
void OnCallsCreated(Call* sender_call, Call* receiver_call) override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
RTC_DCHECK(!call_);
call_ = sender_call;
}
@ -1689,6 +1694,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
VideoSendStream::Config* send_config,
std::vector<VideoReceiveStream::Config>* receive_configs,
VideoEncoderConfig* encoder_config) override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
send_config->rtp.extensions.clear();
send_config->rtp.extensions.push_back(RtpExtension(
RtpExtension::kTransportSequenceNumberUri, kExtensionId));
@ -1699,6 +1705,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
void ModifyAudioConfigs(
AudioSendStream::Config* send_config,
std::vector<AudioReceiveStream::Config>* receive_configs) override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
send_config->rtp.extensions.clear();
send_config->rtp.extensions.push_back(RtpExtension(
RtpExtension::kTransportSequenceNumberUri, kExtensionId));
@ -1708,15 +1715,23 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
Call::Stats stats;
task_queue_->SendTask([this, &stats]() { stats = call_->GetStats(); });
if (stats.send_bandwidth_bps > kStartBitrateBps) {
observation_complete_.Set();
}
RTC_DCHECK_RUN_ON(&module_process_thread_);
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
if (!call_)
return;
Call::Stats stats = call_->GetStats();
if (stats.send_bandwidth_bps > kStartBitrateBps)
observation_complete_.Set();
});
return SEND_PACKET;
}
void OnStreamsStopped() override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
call_ = nullptr;
}
void PerformTest() override {
rtc::NetworkRoute new_route;
new_route.connected = true;
@ -1725,6 +1740,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
BitrateConstraints bitrate_config;
task_queue_->SendTask([this, &new_route, &bitrate_config]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
call_->GetTransportControllerSend()->OnNetworkRouteChanged("transport",
new_route);
bitrate_config.start_bitrate_bps = kStartBitrateBps;
@ -1736,6 +1752,7 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
<< "Timed out while waiting for start bitrate to be exceeded.";
task_queue_->SendTask([this, &new_route, &bitrate_config]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
bitrate_config.start_bitrate_bps = -1;
bitrate_config.max_bitrate_bps = kNewMaxBitrateBps;
call_->GetTransportControllerSend()->SetSdpBitrateParameters(
@ -1750,8 +1767,10 @@ TEST_F(VideoSendStreamTest, ChangingNetworkRoute) {
}
private:
webrtc::SequenceChecker module_process_thread_;
webrtc::SequenceChecker task_queue_thread_;
test::SingleThreadedTaskQueueForTesting* const task_queue_;
Call* call_;
Call* call_ RTC_GUARDED_BY(task_queue_thread_);
} test(&task_queue_);
RunBaseTest(&test);
@ -1839,29 +1858,21 @@ class MaxPaddingSetTest : public test::SendTest {
T* stream_reset_fun,
test::SingleThreadedTaskQueueForTesting* task_queue)
: SendTest(test::CallTest::kDefaultTimeoutMs),
call_(nullptr),
send_stream_(nullptr),
send_stream_config_(nullptr),
packets_sent_(0),
running_without_padding_(test_switch_content_type),
stream_resetter_(stream_reset_fun),
task_queue_(task_queue) {
RTC_DCHECK(stream_resetter_);
}
void OnVideoStreamsCreated(
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
rtc::CritScope lock(&crit_);
send_stream_ = send_stream;
module_process_thread_.Detach();
task_queue_thread_.Detach();
}
void ModifyVideoConfigs(
VideoSendStream::Config* send_config,
std::vector<VideoReceiveStream::Config>* receive_configs,
VideoEncoderConfig* encoder_config) override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
RTC_DCHECK_EQ(1, encoder_config->number_of_streams);
if (RunningWithoutPadding()) {
if (running_without_padding_) {
encoder_config->min_transmit_bitrate_bps = 0;
encoder_config->content_type =
VideoEncoderConfig::ContentType::kRealtimeVideo;
@ -1874,70 +1885,81 @@ class MaxPaddingSetTest : public test::SendTest {
}
void OnCallsCreated(Call* sender_call, Call* receiver_call) override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
RTC_DCHECK(task_queue_->IsCurrent());
RTC_DCHECK(!call_);
RTC_DCHECK(sender_call);
call_ = sender_call;
}
// Called on the pacer thread.
Action OnSendRtp(const uint8_t* packet, size_t length) override {
// GetStats() needs to be called from the construction thread of call_.
Call::Stats stats;
task_queue_->SendTask([this, &stats]() { stats = call_->GetStats(); });
RTC_DCHECK_RUN_ON(&module_process_thread_);
rtc::CritScope lock(&crit_);
// Check the stats on the correct thread and signal the 'complete' flag
// once we detect that we're done.
if (running_without_padding_)
EXPECT_EQ(0, stats.max_padding_bitrate_bps);
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
// In case we get a callback during teardown.
// When this happens, OnStreamsStopped() has been called already,
// |call_| is null and the streams are being torn down.
if (!call_)
return;
// Wait until at least kMinPacketsToSend frames have been encoded, so that
// we have reliable data.
if (++packets_sent_ < kMinPacketsToSend)
return SEND_PACKET;
++packets_sent_;
if (running_without_padding_) {
// We've sent kMinPacketsToSend packets with default configuration, switch
// to enabling screen content and setting min transmit bitrate.
// Note that we need to recreate the stream if changing content type.
packets_sent_ = 0;
encoder_config_.min_transmit_bitrate_bps = kMinTransmitBitrateBps;
encoder_config_.content_type = VideoEncoderConfig::ContentType::kScreen;
running_without_padding_ = false;
content_switch_event_.Set();
return SEND_PACKET;
}
Call::Stats stats = call_->GetStats();
if (running_without_padding_) {
EXPECT_EQ(0, stats.max_padding_bitrate_bps);
// Make sure the pacer has been configured with a min transmit bitrate.
if (stats.max_padding_bitrate_bps > 0)
observation_complete_.Set();
// Wait until at least kMinPacketsToSend frames have been encoded, so
// that we have reliable data.
if (packets_sent_ < kMinPacketsToSend)
return;
// We've sent kMinPacketsToSend packets with default configuration,
// switch to enabling screen content and setting min transmit bitrate.
// Note that we need to recreate the stream if changing content type.
packets_sent_ = 0;
encoder_config_.min_transmit_bitrate_bps = kMinTransmitBitrateBps;
encoder_config_.content_type = VideoEncoderConfig::ContentType::kScreen;
running_without_padding_ = false;
(*stream_resetter_)(send_stream_config_, encoder_config_);
} else {
// Make sure the pacer has been configured with a min transmit bitrate.
if (stats.max_padding_bitrate_bps > 0) {
observation_complete_.Set();
}
}
});
return SEND_PACKET;
}
void PerformTest() override {
if (RunningWithoutPadding()) {
ASSERT_TRUE(
content_switch_event_.Wait(test::CallTest::kDefaultTimeoutMs));
(*stream_resetter_)(send_stream_config_, encoder_config_);
}
// Called on |task_queue_|
void OnStreamsStopped() override {
RTC_DCHECK_RUN_ON(&task_queue_thread_);
RTC_DCHECK(task_queue_->IsCurrent());
call_ = nullptr;
}
void PerformTest() override {
ASSERT_TRUE(Wait()) << "Timed out waiting for a valid padding bitrate.";
}
private:
bool RunningWithoutPadding() const {
rtc::CritScope lock(&crit_);
return running_without_padding_;
}
rtc::CriticalSection crit_;
rtc::Event content_switch_event_;
Call* call_;
VideoSendStream* send_stream_ RTC_GUARDED_BY(crit_);
VideoSendStream::Config send_stream_config_;
webrtc::SequenceChecker task_queue_thread_;
Call* call_ RTC_GUARDED_BY(task_queue_thread_) = nullptr;
VideoSendStream::Config send_stream_config_{nullptr};
VideoEncoderConfig encoder_config_;
uint32_t packets_sent_ RTC_GUARDED_BY(crit_);
bool running_without_padding_;
webrtc::SequenceChecker module_process_thread_;
uint32_t packets_sent_ RTC_GUARDED_BY(task_queue_thread_) = 0;
bool running_without_padding_ RTC_GUARDED_BY(task_queue_thread_);
T* const stream_resetter_;
test::SingleThreadedTaskQueueForTesting* task_queue_;
test::SingleThreadedTaskQueueForTesting* const task_queue_;
};
TEST_F(VideoSendStreamTest, RespectsMinTransmitBitrate) {
@ -1951,15 +1973,14 @@ TEST_F(VideoSendStreamTest, RespectsMinTransmitBitrateAfterContentSwitch) {
// Function for removing and recreating the send stream with a new config.
auto reset_fun = [this](const VideoSendStream::Config& send_stream_config,
const VideoEncoderConfig& encoder_config) {
task_queue_.SendTask([this, &send_stream_config, &encoder_config]() {
Stop();
DestroyVideoSendStreams();
SetVideoSendConfig(send_stream_config);
SetVideoEncoderConfig(encoder_config);
CreateVideoSendStreams();
SetVideoDegradation(DegradationPreference::MAINTAIN_RESOLUTION);
Start();
});
RTC_DCHECK(task_queue_.IsCurrent());
Stop();
DestroyVideoSendStreams();
SetVideoSendConfig(send_stream_config);
SetVideoEncoderConfig(encoder_config);
CreateVideoSendStreams();
SetVideoDegradation(DegradationPreference::MAINTAIN_RESOLUTION);
Start();
};
MaxPaddingSetTest<decltype(reset_fun)> test(true, &reset_fun, &task_queue_);
RunBaseTest(&test);