diff --git a/video/end_to_end_tests/bandwidth_tests.cc b/video/end_to_end_tests/bandwidth_tests.cc index 163e84d4b4..4312c0e065 100644 --- a/video/end_to_end_tests/bandwidth_tests.cc +++ b/video/end_to_end_tests/bandwidth_tests.cc @@ -79,12 +79,14 @@ TEST_F(BandwidthEndToEndTest, ReceiveStreamSendsRemb) { class BandwidthStatsTest : public test::EndToEndTest { public: - explicit BandwidthStatsTest(bool send_side_bwe) + BandwidthStatsTest(bool send_side_bwe, + test::SingleThreadedTaskQueueForTesting* task_queue) : EndToEndTest(test::CallTest::kDefaultTimeoutMs), sender_call_(nullptr), receiver_call_(nullptr), has_seen_pacer_delay_(false), - send_side_bwe_(send_side_bwe) {} + send_side_bwe_(send_side_bwe), + task_queue_(task_queue) {} void ModifyVideoConfigs( VideoSendStream::Config* send_config, @@ -99,15 +101,22 @@ class BandwidthStatsTest : public test::EndToEndTest { } } + // Called on the pacer thread. Action OnSendRtp(const uint8_t* packet, size_t length) override { - Call::Stats sender_stats = sender_call_->GetStats(); - Call::Stats receiver_stats = receiver_call_->GetStats(); - if (!has_seen_pacer_delay_) - has_seen_pacer_delay_ = sender_stats.pacer_delay_ms > 0; - if (sender_stats.send_bandwidth_bps > 0 && has_seen_pacer_delay_) { - if (send_side_bwe_ || receiver_stats.recv_bandwidth_bps > 0) - observation_complete_.Set(); - } + // Stats need to be fetched on the thread where the caller objects were + // constructed. + task_queue_->PostTask([this]() { + Call::Stats sender_stats = sender_call_->GetStats(); + if (!has_seen_pacer_delay_) + has_seen_pacer_delay_ = sender_stats.pacer_delay_ms > 0; + + if (sender_stats.send_bandwidth_bps > 0 && has_seen_pacer_delay_) { + Call::Stats receiver_stats = receiver_call_->GetStats(); + if (send_side_bwe_ || receiver_stats.recv_bandwidth_bps > 0) + observation_complete_.Set(); + } + }); + return SEND_PACKET; } @@ -126,15 +135,16 @@ class BandwidthStatsTest : public test::EndToEndTest { Call* receiver_call_; bool has_seen_pacer_delay_; const bool send_side_bwe_; + test::SingleThreadedTaskQueueForTesting* const task_queue_; }; TEST_F(BandwidthEndToEndTest, VerifySendSideBweStats) { - BandwidthStatsTest test(true); + BandwidthStatsTest test(true, &task_queue_); RunBaseTest(&test); } TEST_F(BandwidthEndToEndTest, VerifyRecvSideBweStats) { - BandwidthStatsTest test(false); + BandwidthStatsTest test(false, &task_queue_); RunBaseTest(&test); } @@ -146,18 +156,16 @@ TEST_F(BandwidthEndToEndTest, VerifyRecvSideBweStats) { TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) { class BweObserver : public test::EndToEndTest { public: - BweObserver() + explicit BweObserver(test::SingleThreadedTaskQueueForTesting* task_queue) : EndToEndTest(kDefaultTimeoutMs), sender_call_(nullptr), clock_(Clock::GetRealTimeClock()), sender_ssrc_(0), remb_bitrate_bps_(1000000), receive_transport_(nullptr), - poller_thread_(&BitrateStatsPollingThread, - this, - "BitrateStatsPollingThread"), state_(kWaitForFirstRampUp), - retransmission_rate_limiter_(clock_, 1000) {} + retransmission_rate_limiter_(clock_, 1000), + task_queue_(task_queue) {} ~BweObserver() {} @@ -200,56 +208,52 @@ TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) { } void OnCallsCreated(Call* sender_call, Call* receiver_call) override { + RTC_DCHECK(sender_call); sender_call_ = sender_call; - } - - static void BitrateStatsPollingThread(void* obj) { - static_cast(obj)->PollStats(); + pending_task_ = task_queue_->PostTask([this]() { PollStats(); }); } void PollStats() { - do { - if (sender_call_) { - Call::Stats stats = sender_call_->GetStats(); - switch (state_) { - case kWaitForFirstRampUp: - if (stats.send_bandwidth_bps >= remb_bitrate_bps_) { - state_ = kWaitForRemb; - remb_bitrate_bps_ /= 2; - rtp_rtcp_->SetRemb( - remb_bitrate_bps_, - std::vector(&sender_ssrc_, &sender_ssrc_ + 1)); - rtp_rtcp_->SendRTCP(kRtcpRr); - } - break; - - case kWaitForRemb: - if (stats.send_bandwidth_bps == remb_bitrate_bps_) { - state_ = kWaitForSecondRampUp; - remb_bitrate_bps_ *= 2; - rtp_rtcp_->SetRemb( - remb_bitrate_bps_, - std::vector(&sender_ssrc_, &sender_ssrc_ + 1)); - rtp_rtcp_->SendRTCP(kRtcpRr); - } - break; - - case kWaitForSecondRampUp: - if (stats.send_bandwidth_bps == remb_bitrate_bps_) { - observation_complete_.Set(); - } - break; + pending_task_ = ~0; // for debugging purposes indicate no pending task. + Call::Stats stats = sender_call_->GetStats(); + switch (state_) { + case kWaitForFirstRampUp: + if (stats.send_bandwidth_bps >= remb_bitrate_bps_) { + state_ = kWaitForRemb; + remb_bitrate_bps_ /= 2; + rtp_rtcp_->SetRemb( + remb_bitrate_bps_, + std::vector(&sender_ssrc_, &sender_ssrc_ + 1)); + rtp_rtcp_->SendRTCP(kRtcpRr); } - } - } while (!stop_event_.Wait(1000)); + break; + + case kWaitForRemb: + if (stats.send_bandwidth_bps == remb_bitrate_bps_) { + state_ = kWaitForSecondRampUp; + remb_bitrate_bps_ *= 2; + rtp_rtcp_->SetRemb( + remb_bitrate_bps_, + std::vector(&sender_ssrc_, &sender_ssrc_ + 1)); + rtp_rtcp_->SendRTCP(kRtcpRr); + } + break; + + case kWaitForSecondRampUp: + if (stats.send_bandwidth_bps == remb_bitrate_bps_) { + observation_complete_.Set(); + return; + } + break; + } + + pending_task_ = + task_queue_->PostDelayedTask([this]() { PollStats(); }, 1000); } void PerformTest() override { - poller_thread_.Start(); EXPECT_TRUE(Wait()) << "Timed out while waiting for bitrate to change according to REMB."; - stop_event_.Set(); - poller_thread_.Stop(); } private: @@ -261,11 +265,11 @@ TEST_F(BandwidthEndToEndTest, RembWithSendSideBwe) { int remb_bitrate_bps_; std::unique_ptr rtp_rtcp_; test::PacketTransport* receive_transport_; - rtc::Event stop_event_; - rtc::PlatformThread poller_thread_; TestState state_; RateLimiter retransmission_rate_limiter_; - } test; + test::SingleThreadedTaskQueueForTesting* const task_queue_; + test::SingleThreadedTaskQueueForTesting::TaskId pending_task_ = ~0; + } test(&task_queue_); RunBaseTest(&test); }