Revert "Preparation for ReceiveStatisticsProxy lock reduction."

This reverts commit 24eed2735b2135227bcfefbabf34a89f9a5fec99.

Reason for revert: Speculative revert: breaks downstream project

Original change's description:
> Preparation for ReceiveStatisticsProxy lock reduction.
> 
> Update tests to call VideoReceiveStream::GetStats() in the same or at
> least similar way it gets called in production (construction thread,
> same TQ/thread).
> 
> Mapped out threads and context for ReceiveStatisticsProxy,
> VideoQualityObserver and VideoReceiveStream. Added
> follow-up TODOs for webrtc:11489.
> 
> One functional change in ReceiveStatisticsProxy is that when sender
> side RtcpPacketTypesCounterUpdated calls are made, the counter is
> updated asynchronously since the sender calls the method on a different
> thread than the receiver.
> 
> Make CallClient::SendTask public to allow tests to run tasks in the
> right context. CallClient already does this internally for GetStats.
> 
> Remove 10 sec sleep in StopSendingKeyframeRequestsForInactiveStream.
> 
> Bug: webrtc:11489
> Change-Id: Ib45bfc59d8472e9c5ea556e6ecf38298b8f14921
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/172847
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Magnus Flodman <mflodman@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#31008}

TBR=mbonadei@webrtc.org,henrika@webrtc.org,kwiberg@webrtc.org,tommi@webrtc.org,juberti@webrtc.org,mflodman@webrtc.org

# Not skipping CQ checks because original CL landed > 1 day ago.

Bug: webrtc:11489
Change-Id: I48b8359cdb791bf22b1a2c2c43d46263b01e0d65
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/173082
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31023}
This commit is contained in:
Artem Titov
2020-04-07 18:02:39 +00:00
committed by Mirko Bonadei
parent 7e60483915
commit 16cc9efd54
19 changed files with 100 additions and 655 deletions

View File

@ -547,7 +547,6 @@ if (rtc_include_tests) {
"rtc_base:weak_ptr_unittests",
"rtc_base/experiments:experiments_unittests",
"rtc_base/synchronization:sequence_checker_unittests",
"rtc_base/task_utils:pending_task_safety_flag_unittests",
"rtc_base/task_utils:to_queued_task_unittests",
"sdk:sdk_tests",
"test:rtp_test_utils",

View File

@ -96,24 +96,21 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
static const int kMinRunTimeMs = 30000;
public:
explicit VideoRtcpAndSyncObserver(TaskQueueBase* task_queue,
Clock* clock,
const std::string& test_label)
explicit VideoRtcpAndSyncObserver(Clock* clock, const std::string& test_label)
: test::RtpRtcpObserver(CallPerfTest::kLongTimeoutMs),
clock_(clock),
test_label_(test_label),
creation_time_ms_(clock_->TimeInMilliseconds()),
task_queue_(task_queue) {}
first_time_in_sync_(-1),
receive_stream_(nullptr) {}
void OnFrame(const VideoFrame& video_frame) override {
task_queue_->PostTask(ToQueuedTask([this]() { CheckStats(); }));
}
void CheckStats() {
if (!receive_stream_)
return;
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
VideoReceiveStream::Stats stats;
{
rtc::CritScope lock(&crit_);
if (receive_stream_)
stats = receive_stream_->GetStats();
}
if (stats.sync_offset_ms == std::numeric_limits<int>::max())
return;
@ -138,8 +135,7 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
}
void set_receive_stream(VideoReceiveStream* receive_stream) {
RTC_DCHECK_EQ(task_queue_, TaskQueueBase::Current());
// Note that receive_stream may be nullptr.
rtc::CritScope lock(&crit_);
receive_stream_ = receive_stream;
}
@ -152,10 +148,10 @@ class VideoRtcpAndSyncObserver : public test::RtpRtcpObserver,
Clock* const clock_;
std::string test_label_;
const int64_t creation_time_ms_;
int64_t first_time_in_sync_ = -1;
VideoReceiveStream* receive_stream_ = nullptr;
int64_t first_time_in_sync_;
rtc::CriticalSection crit_;
VideoReceiveStream* receive_stream_ RTC_GUARDED_BY(crit_);
std::vector<double> sync_offset_ms_list_;
TaskQueueBase* const task_queue_;
};
void CallPerfTest::TestAudioVideoSync(FecMode fec,
@ -172,8 +168,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_net_config.queue_delay_ms = 500;
audio_net_config.loss_percent = 5;
auto observer = std::make_unique<VideoRtcpAndSyncObserver>(
task_queue(), Clock::GetRealTimeClock(), test_label);
VideoRtcpAndSyncObserver observer(Clock::GetRealTimeClock(), test_label);
std::map<uint8_t, MediaType> audio_pt_map;
std::map<uint8_t, MediaType> video_pt_map;
@ -223,7 +218,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
});
audio_send_transport = std::make_unique<test::PacketTransport>(
task_queue(), sender_call_.get(), observer.get(),
task_queue(), sender_call_.get(), &observer,
test::PacketTransport::kSender, audio_pt_map,
std::make_unique<FakeNetworkPipe>(
Clock::GetRealTimeClock(),
@ -231,7 +226,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_send_transport->SetReceiver(receiver_call_->Receiver());
video_send_transport = std::make_unique<test::PacketTransport>(
task_queue(), sender_call_.get(), observer.get(),
task_queue(), sender_call_.get(), &observer,
test::PacketTransport::kSender, video_pt_map,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@ -239,7 +234,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_send_transport->SetReceiver(receiver_call_->Receiver());
receive_transport = std::make_unique<test::PacketTransport>(
task_queue(), receiver_call_.get(), observer.get(),
task_queue(), receiver_call_.get(), &observer,
test::PacketTransport::kReceiver, payload_type_map_,
std::make_unique<FakeNetworkPipe>(Clock::GetRealTimeClock(),
std::make_unique<SimulatedNetwork>(
@ -264,7 +259,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
video_receive_configs_[0].rtp.ulpfec_payload_type = kUlpfecPayloadType;
}
video_receive_configs_[0].rtp.nack.rtp_history_ms = 1000;
video_receive_configs_[0].renderer = observer.get();
video_receive_configs_[0].renderer = &observer;
video_receive_configs_[0].sync_group = kSyncGroup;
AudioReceiveStream::Config audio_recv_config;
@ -286,7 +281,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
receiver_call_->CreateAudioReceiveStream(audio_recv_config);
}
EXPECT_EQ(1u, video_receive_streams_.size());
observer->set_receive_stream(video_receive_streams_[0]);
observer.set_receive_stream(video_receive_streams_[0]);
drifting_clock = std::make_unique<DriftingClock>(clock_, video_ntp_speed);
CreateFrameGeneratorCapturerWithDrift(drifting_clock.get(), video_rtp_speed,
kDefaultFramerate, kDefaultWidth,
@ -298,13 +293,10 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
audio_receive_stream->Start();
});
EXPECT_TRUE(observer->Wait())
EXPECT_TRUE(observer.Wait())
<< "Timed out while waiting for audio and video to be synchronized.";
SendTask(RTC_FROM_HERE, task_queue(), [&]() {
// Clear the pointer to the receive stream since it will now be deleted.
observer->set_receive_stream(nullptr);
audio_send_stream->Stop();
audio_receive_stream->Stop();
@ -322,7 +314,7 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
DestroyCalls();
});
observer->PrintResults();
observer.PrintResults();
// In quick test synchronization may not be achieved in time.
if (!field_trial::IsEnabled("WebRTC-QuickPerfTest")) {
@ -331,9 +323,6 @@ void CallPerfTest::TestAudioVideoSync(FecMode fec,
EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.AVSyncOffsetInMs"));
#endif
}
task_queue()->PostTask(
ToQueuedTask([to_delete = observer.release()]() { delete to_delete; }));
}
TEST_F(CallPerfTest, PlaysOutAudioAndVideoInSyncWithoutClockDrift) {

View File

@ -510,9 +510,9 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
test::NetworkSimulationConfig net_conf;
net_conf.bandwidth = DataRate::KilobitsPerSec(300);
auto send_node = s.CreateSimulationNode(net_conf);
auto* callee = s.CreateClient("return", call_conf);
auto* route = s.CreateRoutes(s.CreateClient("send", call_conf), {send_node},
callee, {s.CreateSimulationNode(net_conf)});
s.CreateClient("return", call_conf),
{s.CreateSimulationNode(net_conf)});
test::VideoStreamConfig lossy_config;
lossy_config.source.framerate = 5;
@ -540,20 +540,14 @@ TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
// from initial probing.
s.RunFor(TimeDelta::Seconds(1));
rtx_packets = 0;
int decoded_baseline = 0;
callee->SendTask([&decoded_baseline, &lossy]() {
decoded_baseline = lossy->receive()->GetStats().frames_decoded;
});
int decoded_baseline = lossy->receive()->GetStats().frames_decoded;
s.RunFor(TimeDelta::Seconds(1));
// We expect both that RTX packets were sent and that an appropriate number of
// frames were received. This is somewhat redundant but reduces the risk of
// false positives in future regressions (e.g. RTX is send due to probing).
EXPECT_GE(rtx_packets, 1);
int frames_decoded = 0;
callee->SendTask([&decoded_baseline, &frames_decoded, &lossy]() {
frames_decoded =
lossy->receive()->GetStats().frames_decoded - decoded_baseline;
});
int frames_decoded =
lossy->receive()->GetStats().frames_decoded - decoded_baseline;
EXPECT_EQ(frames_decoded, 5);
}

View File

@ -537,8 +537,8 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
auto ret_net = {s.CreateSimulationNode(net_conf)};
auto* client = s.CreateClient("send", CallClientConfig());
auto* callee = s.CreateClient("return", CallClientConfig());
auto* route = s.CreateRoutes(client, send_net, callee, ret_net);
auto* route = s.CreateRoutes(
client, send_net, s.CreateClient("return", CallClientConfig()), ret_net);
// TODO(srte): Make this work with RTX enabled or remove it.
auto* video = s.CreateVideoStream(route->forward(), [](VideoStreamConfig* c) {
c->stream.use_rtx = false;
@ -553,17 +553,9 @@ DataRate AverageBitrateAfterCrossInducedLoss(std::string name) {
s.net()->StopCrossTraffic(tcp_traffic);
s.RunFor(TimeDelta::Seconds(20));
}
// Querying the video stats from within the expected runtime environment
// (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
// we're currently on).
VideoReceiveStream::Stats video_receive_stats;
auto* video_stream = video->receive();
callee->SendTask([&video_stream, &video_receive_stats]() {
video_receive_stats = video_stream->GetStats();
});
return DataSize::Bytes(
video_receive_stats.rtp_stats.packet_counter.TotalBytes()) /
return DataSize::Bytes(video->receive()
->GetStats()
.rtp_stats.packet_counter.TotalBytes()) /
s.TimeSinceStart();
}

View File

@ -26,39 +26,12 @@ rtc_library("repeating_task") {
]
}
rtc_library("pending_task_safety_flag") {
sources = [
"pending_task_safety_flag.cc",
"pending_task_safety_flag.h",
]
deps = [
"..:checks",
"..:refcount",
"..:thread_checker",
"../../api:scoped_refptr",
"../synchronization:sequence_checker",
]
}
rtc_source_set("to_queued_task") {
sources = [ "to_queued_task.h" ]
deps = [ "../../api/task_queue" ]
}
if (rtc_include_tests) {
rtc_library("pending_task_safety_flag_unittests") {
testonly = true
sources = [ "pending_task_safety_flag_unittest.cc" ]
deps = [
":pending_task_safety_flag",
":to_queued_task",
"..:rtc_base_approved",
"..:rtc_task_queue",
"..:task_queue_for_test",
"../../test:test_support",
]
}
rtc_library("repeating_task_unittests") {
testonly = true
sources = [ "repeating_task_unittest.cc" ]

View File

@ -1,32 +0,0 @@
/*
* Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/ref_counted_object.h"
namespace webrtc {
// static
PendingTaskSafetyFlag::Pointer PendingTaskSafetyFlag::Create() {
return new rtc::RefCountedObject<PendingTaskSafetyFlag>();
}
void PendingTaskSafetyFlag::SetNotAlive() {
RTC_DCHECK_RUN_ON(&main_sequence_);
alive_ = false;
}
bool PendingTaskSafetyFlag::alive() const {
RTC_DCHECK_RUN_ON(&main_sequence_);
return alive_;
}
} // namespace webrtc

View File

@ -1,61 +0,0 @@
/*
* Copyright 2020 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
#define RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_
#include "api/scoped_refptr.h"
#include "rtc_base/checks.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/synchronization/sequence_checker.h"
namespace webrtc {
// Use this flag to drop pending tasks that have been posted to the "main"
// thread/TQ and end up running after the owning instance has been
// deleted. The owning instance signals deletion by calling SetNotAlive() from
// its destructor.
//
// When posting a task, post a copy (capture by-value in a lambda) of the flag
// instance and before performing the work, check the |alive()| state. Abort if
// alive() returns |false|:
//
// // Running outside of the main thread.
// my_task_queue_->PostTask(ToQueuedTask(
// [safety = pending_task_safety_flag_, this]() {
// // Now running on the main thread.
// if (!safety->alive())
// return;
// MyMethod();
// }));
//
// Note that checking the state only works on the construction/destruction
// thread of the ReceiveStatisticsProxy instance.
class PendingTaskSafetyFlag : public rtc::RefCountInterface {
public:
using Pointer = rtc::scoped_refptr<PendingTaskSafetyFlag>;
static Pointer Create();
~PendingTaskSafetyFlag() = default;
void SetNotAlive();
bool alive() const;
protected:
PendingTaskSafetyFlag() = default;
private:
bool alive_ = true;
SequenceChecker main_sequence_;
};
} // namespace webrtc
#endif // RTC_BASE_TASK_UTILS_PENDING_TASK_SAFETY_FLAG_H_

View File

@ -1,151 +0,0 @@
/*
* Copyright 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include <memory>
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_queue_for_test.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
using ::testing::AtLeast;
using ::testing::Invoke;
using ::testing::MockFunction;
using ::testing::NiceMock;
using ::testing::Return;
} // namespace
TEST(PendingTaskSafetyFlagTest, Basic) {
PendingTaskSafetyFlag::Pointer safety_flag;
{
// Scope for the |owner| instance.
class Owner {
public:
Owner() = default;
~Owner() { flag_->SetNotAlive(); }
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
} owner;
EXPECT_TRUE(owner.flag_->alive());
safety_flag = owner.flag_;
EXPECT_TRUE(safety_flag->alive());
}
EXPECT_FALSE(safety_flag->alive());
}
TEST(PendingTaskSafetyFlagTest, PendingTaskSuccess) {
TaskQueueForTest tq1("OwnerHere");
TaskQueueForTest tq2("OwnerNotHere");
class Owner {
public:
Owner() : tq_main_(TaskQueueBase::Current()) { RTC_DCHECK(tq_main_); }
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
flag_->SetNotAlive();
}
void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
if (!safe->alive())
return;
stuff_done_ = true;
}));
}
bool stuff_done() const { return stuff_done_; }
private:
TaskQueueBase* const tq_main_;
bool stuff_done_ = false;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
};
std::unique_ptr<Owner> owner;
tq1.SendTask(
[&owner]() {
owner.reset(new Owner());
EXPECT_FALSE(owner->stuff_done());
},
RTC_FROM_HERE);
ASSERT_TRUE(owner);
tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
tq1.SendTask(
[&owner]() {
EXPECT_TRUE(owner->stuff_done());
owner.reset();
},
RTC_FROM_HERE);
ASSERT_FALSE(owner);
}
TEST(PendingTaskSafetyFlagTest, PendingTaskDropped) {
TaskQueueForTest tq1("OwnerHere");
TaskQueueForTest tq2("OwnerNotHere");
class Owner {
public:
explicit Owner(bool* stuff_done)
: tq_main_(TaskQueueBase::Current()), stuff_done_(stuff_done) {
RTC_DCHECK(tq_main_);
*stuff_done_ = false;
}
~Owner() {
RTC_DCHECK(tq_main_->IsCurrent());
flag_->SetNotAlive();
}
void DoStuff() {
RTC_DCHECK(!tq_main_->IsCurrent());
tq_main_->PostTask(ToQueuedTask([safe = flag_, this]() {
if (!safe->alive())
return;
*stuff_done_ = true;
}));
}
private:
TaskQueueBase* const tq_main_;
bool* const stuff_done_;
PendingTaskSafetyFlag::Pointer flag_{PendingTaskSafetyFlag::Create()};
};
std::unique_ptr<Owner> owner;
bool stuff_done = false;
tq1.SendTask([&owner, &stuff_done]() { owner.reset(new Owner(&stuff_done)); },
RTC_FROM_HERE);
ASSERT_TRUE(owner);
// Queue up a task on tq1 that will execute before the 'DoStuff' task
// can, and delete the |owner| before the 'stuff' task can execute.
rtc::Event blocker;
tq1.PostTask([&blocker, &owner]() {
blocker.Wait(rtc::Event::kForever);
owner.reset();
});
// Queue up a DoStuff...
tq2.SendTask([&owner]() { owner->DoStuff(); }, RTC_FROM_HERE);
ASSERT_TRUE(owner);
blocker.Set();
// Run an empty task on tq1 to flush all the queued tasks.
tq1.SendTask([]() {}, RTC_FROM_HERE);
ASSERT_FALSE(owner);
EXPECT_FALSE(stuff_done);
}
} // namespace webrtc

View File

@ -113,11 +113,6 @@ class CallClient : public EmulatedNetworkReceiverInterface {
void OnPacketReceived(EmulatedIpPacket packet) override;
std::unique_ptr<RtcEventLogOutput> GetLogWriter(std::string name);
// Exposed publicly so that tests can execute tasks such as querying stats
// for media streams in the expected runtime environment (essentially what
// CallClient does internally for GetStats()).
void SendTask(std::function<void()> task);
private:
friend class Scenario;
friend class CallClientPair;
@ -134,6 +129,7 @@ class CallClient : public EmulatedNetworkReceiverInterface {
uint32_t GetNextAudioLocalSsrc();
uint32_t GetNextRtxSsrc();
void AddExtensions(std::vector<RtpExtension> extensions);
void SendTask(std::function<void()> task);
int16_t Bind(EmulatedEndpoint* endpoint);
void UnBind();

View File

@ -25,26 +25,17 @@ void CreateAnalyzedStream(Scenario* s,
VideoStreamConfig::Encoder::Implementation::kSoftware;
config.hooks.frame_pair_handlers = {analyzer->Handler()};
auto* caller = s->CreateClient("caller", CallClientConfig());
auto* callee = s->CreateClient("callee", CallClientConfig());
auto route =
s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)}, callee,
s->CreateRoutes(caller, {s->CreateSimulationNode(network_config)},
s->CreateClient("callee", CallClientConfig()),
{s->CreateSimulationNode(NetworkSimulationConfig())});
VideoStreamPair* video = s->CreateVideoStream(route->forward(), config);
auto* video = s->CreateVideoStream(route->forward(), config);
auto* audio = s->CreateAudioStream(route->forward(), AudioStreamConfig());
s->Every(TimeDelta::Seconds(1), [=] {
collectors->call.AddStats(caller->GetStats());
collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
collectors->audio_receive.AddStats(audio->receive()->GetStats());
// Querying the video stats from within the expected runtime environment
// (i.e. the TQ that belongs to the CallClient, not the Scenario TQ that
// we're currently on).
VideoReceiveStream::Stats video_receive_stats;
auto* video_stream = video->receive();
callee->SendTask([&video_stream, &video_receive_stats]() {
video_receive_stats = video_stream->GetStats();
});
collectors->video_receive.AddStats(video_receive_stats);
collectors->video_send.AddStats(video->send()->GetStats(), s->Now());
collectors->video_receive.AddStats(video->receive()->GetStats());
});
}
} // namespace

View File

@ -115,7 +115,6 @@ rtc_library("video") {
"../rtc_base/experiments:rate_control_settings",
"../rtc_base/synchronization:sequence_checker",
"../rtc_base/system:thread_registry",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:repeating_task",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/time:timestamp_extrapolator",

View File

@ -18,7 +18,6 @@
#include "call/simulated_network.h"
#include "modules/rtp_rtcp/source/rtp_packet.h"
#include "modules/video_coding/codecs/vp8/include/vp8.h"
#include "rtc_base/event.h"
#include "rtc_base/task_queue_for_test.h"
#include "system_wrappers/include/sleep.h"
#include "test/call_test.h"
@ -204,7 +203,7 @@ TEST_F(RetransmissionEndToEndTest, ReceivesNackAndRetransmitsAudio) {
TEST_F(RetransmissionEndToEndTest,
StopSendingKeyframeRequestsForInactiveStream) {
class KeyframeRequestObserver : public test::EndToEndTest, public QueuedTask {
class KeyframeRequestObserver : public test::EndToEndTest {
public:
explicit KeyframeRequestObserver(TaskQueueBase* task_queue)
: clock_(Clock::GetRealTimeClock()), task_queue_(task_queue) {}
@ -217,59 +216,28 @@ TEST_F(RetransmissionEndToEndTest,
receive_stream_ = receive_streams[0];
}
Action OnReceiveRtcp(const uint8_t* packet, size_t length) override {
test::RtcpPacketParser parser;
EXPECT_TRUE(parser.Parse(packet, length));
if (parser.pli()->num_packets() > 0)
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
return SEND_PACKET;
}
bool PollStats() {
if (receive_stream_->GetStats().frames_decoded > 0) {
frame_decoded_ = true;
} else if (clock_->TimeInMilliseconds() - start_time_ < 5000) {
task_queue_->PostDelayedTask(std::unique_ptr<QueuedTask>(this), 100);
return false;
}
return true;
}
void PerformTest() override {
start_time_ = clock_->TimeInMilliseconds();
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
test_done_.Wait(rtc::Event::kForever);
}
bool Run() override {
if (!frame_decoded_) {
if (PollStats()) {
send_stream_->Stop();
if (!frame_decoded_) {
test_done_.Set();
} else {
// Now we wait for the PLI packet. Once we receive it, a task
// will be posted (see OnReceiveRtcp) and we'll check the stats
// once more before signaling that we're done.
}
bool frame_decoded = false;
int64_t start_time = clock_->TimeInMilliseconds();
while (clock_->TimeInMilliseconds() - start_time <= 5000) {
if (receive_stream_->GetStats().frames_decoded > 0) {
frame_decoded = true;
break;
}
} else {
EXPECT_EQ(
1U,
receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
test_done_.Set();
SleepMs(100);
}
return false;
ASSERT_TRUE(frame_decoded);
SendTask(RTC_FROM_HERE, task_queue_, [this]() { send_stream_->Stop(); });
SleepMs(10000);
ASSERT_EQ(
1U, receive_stream_->GetStats().rtcp_packet_type_counts.pli_packets);
}
private:
Clock* const clock_;
Clock* clock_;
VideoSendStream* send_stream_;
VideoReceiveStream* receive_stream_;
TaskQueueBase* const task_queue_;
rtc::Event test_done_;
bool frame_decoded_ = false;
int64_t start_time_ = 0;
} test(task_queue());
RunBaseTest(&test);

View File

@ -297,7 +297,6 @@ TEST_F(StatsEndToEndTest, GetStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
@ -308,10 +307,8 @@ TEST_F(StatsEndToEndTest, GetStats) {
bool send_ok = false;
while (now_ms < stop_time_ms) {
if (!receive_ok && task_queue_) {
SendTask(RTC_FROM_HERE, task_queue_,
[&]() { receive_ok = CheckReceiveStats(); });
}
if (!receive_ok)
receive_ok = CheckReceiveStats();
if (!send_ok)
send_ok = CheckSendStats();
@ -349,7 +346,6 @@ TEST_F(StatsEndToEndTest, GetStats) {
rtc::Event check_stats_event_;
ReceiveStreamRenderer receive_stream_renderer_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -381,28 +377,22 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
}
void PerformTest() override {
// No frames reported initially.
SendTask(RTC_FROM_HERE, task_queue_, [&]() {
for (const auto& receive_stream : receive_streams_) {
EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
}
});
for (const auto& receive_stream : receive_streams_) {
EXPECT_FALSE(receive_stream->GetStats().timing_frame_info);
}
// Wait for at least one timing frame to be sent with 100ms grace period.
SleepMs(kDefaultTimingFramesDelayMs + 100);
// Check that timing frames are reported for each stream.
SendTask(RTC_FROM_HERE, task_queue_, [&]() {
for (const auto& receive_stream : receive_streams_) {
EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
}
});
for (const auto& receive_stream : receive_streams_) {
EXPECT_TRUE(receive_stream->GetStats().timing_frame_info);
}
}
std::vector<VideoReceiveStream*> receive_streams_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -410,8 +400,7 @@ TEST_F(StatsEndToEndTest, TimingFramesAreReported) {
TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
static const size_t kNumRtpPacketsToSend = 5;
class ReceivedRtpStatsObserver : public test::EndToEndTest,
public QueuedTask {
class ReceivedRtpStatsObserver : public test::EndToEndTest {
public:
ReceivedRtpStatsObserver()
: EndToEndTest(kDefaultTimeoutMs),
@ -423,14 +412,14 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
VideoSendStream* send_stream,
const std::vector<VideoReceiveStream*>& receive_streams) override {
receive_stream_ = receive_streams[0];
task_queue_ = TaskQueueBase::Current();
EXPECT_TRUE(task_queue_ != nullptr);
}
Action OnSendRtp(const uint8_t* packet, size_t length) override {
if (sent_rtp_ >= kNumRtpPacketsToSend) {
// Need to check the stats on the correct thread.
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
observation_complete_.Set();
}
return DROP_PACKET;
}
++sent_rtp_;
@ -442,17 +431,8 @@ TEST_F(StatsEndToEndTest, TestReceivedRtpPacketStats) {
<< "Timed out while verifying number of received RTP packets.";
}
bool Run() override {
VideoReceiveStream::Stats stats = receive_stream_->GetStats();
if (kNumRtpPacketsToSend == stats.rtp_stats.packet_counter.packets) {
observation_complete_.Set();
}
return false;
}
VideoReceiveStream* receive_stream_;
uint32_t sent_rtp_;
TaskQueueBase* task_queue_ = nullptr;
} test;
RunBaseTest(&test);
@ -598,7 +578,7 @@ TEST_F(StatsEndToEndTest, MAYBE_ContentTypeSwitches) {
TEST_F(StatsEndToEndTest, VerifyNackStats) {
static const int kPacketNumberToDrop = 200;
class NackObserver : public test::EndToEndTest, public QueuedTask {
class NackObserver : public test::EndToEndTest {
public:
NackObserver()
: EndToEndTest(kLongTimeoutMs),
@ -618,7 +598,7 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
dropped_rtp_packet_ = header.sequenceNumber;
return DROP_PACKET;
}
task_queue_->PostTask(std::unique_ptr<QueuedTask>(this));
VerifyStats();
return SEND_PACKET;
}
@ -679,14 +659,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
const std::vector<VideoReceiveStream*>& receive_streams) override {
send_stream_ = send_stream;
receive_streams_ = receive_streams;
task_queue_ = TaskQueueBase::Current();
EXPECT_TRUE(task_queue_ != nullptr);
}
bool Run() override {
rtc::CritScope lock(&crit_);
VerifyStats();
return false;
}
void PerformTest() override {
@ -701,7 +673,6 @@ TEST_F(StatsEndToEndTest, VerifyNackStats) {
std::vector<VideoReceiveStream*> receive_streams_;
VideoSendStream* send_stream_;
absl::optional<int64_t> start_runtime_ms_;
TaskQueueBase* task_queue_ = nullptr;
} test;
metrics::Reset();

View File

@ -18,12 +18,10 @@
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/metrics.h"
#include "video/video_receive_stream.h"
namespace webrtc {
namespace {
@ -84,9 +82,9 @@ std::string UmaSuffixForContentType(VideoContentType content_type) {
ReceiveStatisticsProxy::ReceiveStatisticsProxy(
const VideoReceiveStream::Config* config,
Clock* clock,
TaskQueueBase* worker_thread)
Clock* clock)
: clock_(clock),
config_(*config),
start_ms_(clock->TimeInMilliseconds()),
enable_decode_time_histograms_(
!field_trial::IsEnabled("WebRTC-DecodeTimeHistogramsKillSwitch")),
@ -119,53 +117,27 @@ ReceiveStatisticsProxy::ReceiveStatisticsProxy(
last_codec_type_(kVideoCodecVP8),
num_delayed_frames_rendered_(0),
sum_missed_render_deadline_ms_(0),
timing_frame_info_counter_(kMovingMaxWindowMs),
worker_thread_(worker_thread) {
RTC_DCHECK(worker_thread);
decode_queue_.Detach();
incoming_render_queue_.Detach();
stats_.ssrc = config->rtp.remote_ssrc;
}
ReceiveStatisticsProxy::ReceiveStatisticsProxy(
const VideoReceiveStream::Config* config,
Clock* clock)
: ReceiveStatisticsProxy(config, clock, internal::GetCurrentTaskQueue()) {}
ReceiveStatisticsProxy::~ReceiveStatisticsProxy() {
RTC_DCHECK_RUN_ON(&main_thread_);
task_safety_flag_->SetNotAlive();
timing_frame_info_counter_(kMovingMaxWindowMs) {
decode_thread_.Detach();
network_thread_.Detach();
stats_.ssrc = config_.rtp.remote_ssrc;
}
void ReceiveStatisticsProxy::UpdateHistograms(
absl::optional<int> fraction_lost,
const StreamDataCounters& rtp_stats,
const StreamDataCounters* rtx_stats) {
{
// TODO(webrtc:11489): Delete this scope after refactoring.
// We're actually on the main thread here, below is the explanation for
// why we use another thread checker. Once refactored, we can clean this
// up and not use the decode_queue_ checker here.
RTC_DCHECK_RUN_ON(&main_thread_);
}
// We're not actually running on the decoder thread, but must be called after
// Not actually running on the decoder thread, but must be called after
// DecoderThreadStopped, which detaches the thread checker. It is therefore
// safe to access |qp_counters_|, which were updated on the decode thread
// earlier.
RTC_DCHECK_RUN_ON(&decode_queue_);
RTC_DCHECK_RUN_ON(&decode_thread_);
rtc::CritScope lock(&crit_);
// TODO(webrtc:11489): Many of these variables don't need to be inside the
// scope of a lock. Also consider grabbing the lock only to copy the state
// that histograms need to be reported for, then report histograms while not
// holding the lock.
char log_stream_buf[8 * 1024];
rtc::SimpleStringBuilder log_stream(log_stream_buf);
int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000;
if (stats_.frame_counts.key_frames > 0 ||
stats_.frame_counts.delta_frames > 0) {
RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds",
@ -504,8 +476,6 @@ void ReceiveStatisticsProxy::UpdateHistograms(
}
void ReceiveStatisticsProxy::QualitySample() {
RTC_DCHECK_RUN_ON(&incoming_render_queue_);
int64_t now = clock_->TimeInMilliseconds();
if (last_sample_time_ + kMinSampleLengthMs > now)
return;
@ -575,8 +545,6 @@ void ReceiveStatisticsProxy::QualitySample() {
}
void ReceiveStatisticsProxy::UpdateFramerate(int64_t now_ms) const {
// TODO(webrtc:11489): Currently seems to be called from two threads,
// main and decode. Consider moving both to main.
int64_t old_frames_ms = now_ms - kRateStatisticsWindowSizeMs;
while (!frame_window_.empty() &&
frame_window_.begin()->first < old_frames_ms) {
@ -592,9 +560,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms(
int width,
int height,
int decode_time_ms) const {
RTC_DCHECK_RUN_ON(&decode_queue_);
// TODO(webrtc:11489): Consider posting the work to the worker thread.
bool is_4k = (width == 3840 || width == 4096) && height == 2160;
bool is_hd = width == 1920 && height == 1080;
// Only update histograms for 4k/HD and VP9/H264.
@ -649,7 +614,6 @@ void ReceiveStatisticsProxy::UpdateDecodeTimeHistograms(
absl::optional<int64_t>
ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs(
int64_t now_ms) const {
RTC_DCHECK_RUN_ON(&main_thread_);
if (!last_estimated_playout_ntp_timestamp_ms_ ||
!last_estimated_playout_time_ms_) {
return absl::nullopt;
@ -659,12 +623,6 @@ ReceiveStatisticsProxy::GetCurrentEstimatedPlayoutNtpTimestampMs(
}
VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const {
RTC_DCHECK_RUN_ON(&main_thread_);
// Like VideoReceiveStream::GetStats, called on the worker thread from
// StatsCollector::ExtractMediaInfo via worker_thread()->Invoke().
// WebRtcVideoChannel::GetStats(), GetVideoReceiverInfo.
rtc::CritScope lock(&crit_);
// Get current frame rates here, as only updating them on new frames prevents
// us from ever correctly displaying frame rate of 0.
@ -696,16 +654,12 @@ VideoReceiveStream::Stats ReceiveStatisticsProxy::GetStats() const {
}
void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
stats_.current_payload_type = payload_type;
}
void ReceiveStatisticsProxy::OnDecoderImplementationName(
const char* implementation_name) {
RTC_DCHECK_RUN_ON(&decode_queue_);
// TODO(webrtc:11489): is a lock needed for this variable? Currently seems to
// be only touched on the decoder queue.
rtc::CritScope lock(&crit_);
stats_.decoder_implementation_name = implementation_name;
}
@ -717,7 +671,6 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
int jitter_buffer_ms,
int min_playout_delay_ms,
int render_delay_ms) {
RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
stats_.max_decode_ms = max_decode_ms;
stats_.current_delay_ms = current_delay_ms;
@ -734,14 +687,12 @@ void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
}
void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) {
RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
num_unique_frames_.emplace(num_unique_frames);
}
void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
const TimingFrameInfo& info) {
RTC_DCHECK_RUN_ON(&decode_queue_);
rtc::CritScope lock(&crit_);
if (info.flags != VideoSendTiming::kInvalid) {
int64_t now_ms = clock_->TimeInMilliseconds();
@ -763,28 +714,6 @@ void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
uint32_t ssrc,
const RtcpPacketTypeCounter& packet_counter) {
if (!worker_thread_->IsCurrent()) {
// RtpRtcp::Configuration has a single RtcpPacketTypeCounterObserver and
// that same configuration may be used for both receiver and sender
// (see ModuleRtpRtcpImpl::ModuleRtpRtcpImpl).
// The RTCPSender implementation currently makes calls to this function on a
// process thread whereas the RTCPReceiver implementation calls back on the
// [main] worker thread.
// So until the sender implementation has been updated, we work around this
// here by posting the update to the expected thread. We make a by value
// copy of the |task_safety_flag_| to handle the case if the queued task
// runs after the |ReceiveStatisticsProxy| has been deleted. In such a
// case the packet_counter update won't be recorded.
worker_thread_->PostTask(ToQueuedTask(
[safety = task_safety_flag_, ssrc, packet_counter, this]() {
if (!safety->alive())
return;
RtcpPacketTypesCounterUpdated(ssrc, packet_counter);
}));
return;
}
RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
if (stats_.ssrc != ssrc)
return;
@ -792,7 +721,6 @@ void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
}
void ReceiveStatisticsProxy::OnCname(uint32_t ssrc, absl::string_view cname) {
RTC_DCHECK_RUN_ON(&main_thread_);
rtc::CritScope lock(&crit_);
// TODO(pbos): Handle both local and remote ssrcs here and RTC_DCHECK that we
// receive stats from one of them.
@ -805,13 +733,9 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
absl::optional<uint8_t> qp,
int32_t decode_time_ms,
VideoContentType content_type) {
RTC_DCHECK_RUN_ON(&decode_queue_);
// TODO(webrtc:11489): - Same as OnRenderedFrame. Both called from within
// VideoStreamDecoder::FrameToRender
rtc::CritScope lock(&crit_);
const uint64_t now_ms = clock_->TimeInMilliseconds();
uint64_t now_ms = clock_->TimeInMilliseconds();
if (videocontenttypehelpers::IsScreenshare(content_type) !=
videocontenttypehelpers::IsScreenshare(last_content_type_)) {
@ -870,10 +794,6 @@ void ReceiveStatisticsProxy::OnDecodedFrame(const VideoFrame& frame,
}
void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) {
RTC_DCHECK_RUN_ON(&incoming_render_queue_);
// TODO(webrtc:11489): Consider posting the work to the worker thread.
// - Called from VideoReceiveStream::OnFrame.
int width = frame.width();
int height = frame.height();
RTC_DCHECK_GT(width, 0);
@ -913,10 +833,7 @@ void ReceiveStatisticsProxy::OnRenderedFrame(const VideoFrame& frame) {
void ReceiveStatisticsProxy::OnSyncOffsetUpdated(int64_t video_playout_ntp_ms,
int64_t sync_offset_ms,
double estimated_freq_khz) {
RTC_DCHECK_RUN_ON(&incoming_render_queue_);
rtc::CritScope lock(&crit_);
// TODO(webrtc:11489): Lock possibly not needed for sync_offset_counter_ if
// it's only touched on the decoder thread.
sync_offset_counter_.Add(std::abs(sync_offset_ms));
stats_.sync_offset_ms = sync_offset_ms;
last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms;
@ -969,7 +886,7 @@ void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
}
void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
RTC_DCHECK_RUN_ON(&decode_queue_);
RTC_DCHECK_RUN_ON(&decode_thread_);
rtc::CritScope lock(&crit_);
last_codec_type_ = codec_type;
if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
@ -979,8 +896,6 @@ void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
}
void ReceiveStatisticsProxy::OnStreamInactive() {
RTC_DCHECK_RUN_ON(&decode_queue_);
// TODO(sprang): Figure out any other state that should be reset.
rtc::CritScope lock(&crit_);
@ -991,13 +906,6 @@ void ReceiveStatisticsProxy::OnStreamInactive() {
void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
// TODO(webrtc:11489): Is this a duplicate of VideoReceiveStream::OnRttUpdate?
// - looks like that runs on a/the module process thread.
//
// BUGBUG
// Actually, it looks like this method is never called except from a unit
// test, GetStatsReportsDecodeTimingStats.
rtc::CritScope lock(&crit_);
avg_rtt_ms_ = avg_rtt_ms;
}
@ -1008,7 +916,7 @@ void ReceiveStatisticsProxy::DecoderThreadStarting() {
void ReceiveStatisticsProxy::DecoderThreadStopped() {
RTC_DCHECK_RUN_ON(&main_thread_);
decode_queue_.Detach();
decode_thread_.Detach();
}
ReceiveStatisticsProxy::ContentSpecificStats::ContentSpecificStats()
@ -1029,5 +937,4 @@ void ReceiveStatisticsProxy::ContentSpecificStats::Add(
frame_counts.delta_frames += other.frame_counts.delta_frames;
interframe_delay_percentiles.Add(other.interframe_delay_percentiles);
}
} // namespace webrtc

View File

@ -17,7 +17,6 @@
#include <vector>
#include "absl/types/optional.h"
#include "api/task_queue/task_queue_base.h"
#include "call/video_receive_stream.h"
#include "modules/include/module_common_types.h"
#include "modules/video_coding/include/video_coding_defines.h"
@ -27,8 +26,6 @@
#include "rtc_base/numerics/sample_counter.h"
#include "rtc_base/rate_statistics.h"
#include "rtc_base/rate_tracker.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread_annotations.h"
#include "rtc_base/thread_checker.h"
#include "video/quality_threshold.h"
@ -45,14 +42,9 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
public RtcpPacketTypeCounterObserver,
public CallStatsObserver {
public:
ReceiveStatisticsProxy(const VideoReceiveStream::Config* config,
Clock* clock,
TaskQueueBase* worker_thread);
// TODO(webrtc:11489): Remove this ctor once all callers have been updated
// to use the above one.
ReceiveStatisticsProxy(const VideoReceiveStream::Config* config,
Clock* clock);
~ReceiveStatisticsProxy();
~ReceiveStatisticsProxy() = default;
VideoReceiveStream::Stats GetStats() const;
@ -147,6 +139,14 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
int64_t now_ms) const RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
Clock* const clock_;
// Ownership of this object lies with the owner of the ReceiveStatisticsProxy
// instance. Lifetime is guaranteed to outlive |this|.
// TODO(tommi): In practice the config_ reference is only used for accessing
// config_.rtp.ulpfec.ulpfec_payload_type. Instead of holding a pointer back,
// we could just store the value of ulpfec_payload_type and change the
// ReceiveStatisticsProxy() ctor to accept a const& of Config (since we'll
// then no longer store a pointer to the object).
const VideoReceiveStream::Config& config_;
const int64_t start_ms_;
const bool enable_decode_time_histograms_;
@ -177,7 +177,7 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
std::map<VideoContentType, ContentSpecificStats> content_specific_stats_
RTC_GUARDED_BY(crit_);
MaxCounter freq_offset_counter_ RTC_GUARDED_BY(crit_);
QpCounters qp_counters_ RTC_GUARDED_BY(decode_queue_);
QpCounters qp_counters_ RTC_GUARDED_BY(decode_thread_);
int64_t avg_rtt_ms_ RTC_GUARDED_BY(crit_);
mutable std::map<int64_t, size_t> frame_window_ RTC_GUARDED_BY(&crit_);
VideoContentType last_content_type_ RTC_GUARDED_BY(&crit_);
@ -196,17 +196,9 @@ class ReceiveStatisticsProxy : public VCMReceiveStatisticsCallback,
RTC_GUARDED_BY(&crit_);
absl::optional<int64_t> last_estimated_playout_time_ms_
RTC_GUARDED_BY(&crit_);
// The thread on which this instance is constructed and some of its main
// methods are invoked on such as GetStats().
TaskQueueBase* const worker_thread_;
PendingTaskSafetyFlag::Pointer task_safety_flag_{
PendingTaskSafetyFlag::Create()};
SequenceChecker decode_queue_;
rtc::ThreadChecker decode_thread_;
rtc::ThreadChecker network_thread_;
rtc::ThreadChecker main_thread_;
SequenceChecker incoming_render_queue_;
};
} // namespace webrtc

View File

@ -22,8 +22,6 @@
#include "api/video/video_frame.h"
#include "api/video/video_frame_buffer.h"
#include "api/video/video_rotation.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "system_wrappers/include/metrics.h"
#include "test/field_trial.h"
#include "test/gtest.h"
@ -41,63 +39,13 @@ const int kHeight = 720;
// TODO(sakal): ReceiveStatisticsProxy is lacking unittesting.
class ReceiveStatisticsProxyTest : public ::testing::Test {
public:
ReceiveStatisticsProxyTest()
: fake_clock_(1234),
config_(GetTestConfig()),
worker_thread_(&socket_server_) {
worker_thread_.WrapCurrent();
RTC_CHECK_EQ(webrtc::TaskQueueBase::Current(),
static_cast<TaskQueueBase*>(&worker_thread_));
metrics::Reset();
statistics_proxy_.reset(
new ReceiveStatisticsProxy(&config_, &fake_clock_, &worker_thread_));
}
~ReceiveStatisticsProxyTest() override {
statistics_proxy_.reset();
worker_thread_.UnwrapCurrent();
}
ReceiveStatisticsProxyTest() : fake_clock_(1234), config_(GetTestConfig()) {}
virtual ~ReceiveStatisticsProxyTest() {}
protected:
class FakeSocketServer : public rtc::SocketServer {
public:
FakeSocketServer() = default;
~FakeSocketServer() = default;
bool Wait(int cms, bool process_io) override {
if (fail_next_wait_) {
fail_next_wait_ = false;
return false;
}
return true;
}
void WakeUp() override {}
rtc::Socket* CreateSocket(int family, int type) override { return nullptr; }
rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override {
return nullptr;
}
void FailNextWait() { fail_next_wait_ = true; }
private:
bool fail_next_wait_ = false;
};
class WorkerThread : public rtc::Thread {
public:
explicit WorkerThread(rtc::SocketServer* ss)
: rtc::Thread(ss), tq_setter_(this) {}
private:
CurrentTaskQueueSetter tq_setter_;
};
void FlushWorker() {
worker_thread_.PostTask(
ToQueuedTask([this]() { socket_server_.FailNextWait(); }));
worker_thread_.ProcessMessages(1000);
virtual void SetUp() {
metrics::Reset();
statistics_proxy_.reset(new ReceiveStatisticsProxy(&config_, &fake_clock_));
}
VideoReceiveStream::Config GetTestConfig() {
@ -130,8 +78,6 @@ class ReceiveStatisticsProxyTest : public ::testing::Test {
SimulatedClock fake_clock_;
const VideoReceiveStream::Config config_;
std::unique_ptr<ReceiveStatisticsProxy> statistics_proxy_;
FakeSocketServer socket_server_;
WorkerThread worker_thread_;
};
TEST_F(ReceiveStatisticsProxyTest, OnDecodedFrameIncreasesFramesDecoded) {

View File

@ -49,14 +49,10 @@ VideoQualityObserver::VideoQualityObserver(VideoContentType content_type)
current_resolution_(Resolution::Low),
num_resolution_downgrades_(0),
time_in_blocky_video_ms_(0),
// TODO(webrtc:11489): content_type_ variable isn't necessary.
content_type_(content_type),
is_paused_(false) {}
void VideoQualityObserver::UpdateHistograms() {
// TODO(webrtc:11489): Called on the decoder thread - which _might_ be
// the same as the construction thread.
// Don't report anything on an empty video stream.
if (num_frames_rendered_ == 0) {
return;

View File

@ -182,13 +182,6 @@ constexpr int kInactiveStreamThresholdMs = 600000; // 10 minutes.
namespace internal {
TaskQueueBase* GetCurrentTaskQueue() {
TaskQueueBase* ret = TaskQueueBase::Current();
if (!ret)
ret = rtc::ThreadManager::Instance()->CurrentThread();
return ret;
}
VideoReceiveStream::VideoReceiveStream(
TaskQueueFactory* task_queue_factory,
RtpStreamReceiverControllerInterface* receiver_controller,
@ -204,11 +197,10 @@ VideoReceiveStream::VideoReceiveStream(
config_(std::move(config)),
num_cpu_cores_(num_cpu_cores),
process_thread_(process_thread),
worker_thread_(GetCurrentTaskQueue()),
clock_(clock),
call_stats_(call_stats),
source_tracker_(clock_),
stats_proxy_(&config_, clock_, worker_thread_),
stats_proxy_(&config_, clock_),
rtp_receive_statistics_(ReceiveStatistics::Create(clock_)),
timing_(timing),
video_receiver_(clock_, timing_.get()),
@ -447,7 +439,6 @@ void VideoReceiveStream::Stop() {
}
VideoReceiveStream::Stats VideoReceiveStream::GetStats() const {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
VideoReceiveStream::Stats stats = stats_proxy_.GetStats();
stats.total_bitrate_bps = 0;
StreamStatistician* statistician =
@ -466,7 +457,6 @@ VideoReceiveStream::Stats VideoReceiveStream::GetStats() const {
}
void VideoReceiveStream::UpdateHistograms() {
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
absl::optional<int> fraction_lost;
StreamDataCounters rtp_stats;
StreamStatistician* statistician =
@ -503,7 +493,6 @@ bool VideoReceiveStream::SetBaseMinimumPlayoutDelayMs(int delay_ms) {
return false;
}
// TODO(webrtc:11489): Consider posting to worker.
rtc::CritScope cs(&playout_delay_lock_);
base_minimum_playout_delay_ms_ = delay_ms;
UpdatePlayoutDelays();
@ -517,19 +506,19 @@ int VideoReceiveStream::GetBaseMinimumPlayoutDelayMs() const {
return base_minimum_playout_delay_ms_;
}
// TODO(webrtc:11489): This method grabs a lock 6 times.
// TODO(tommi): This method grabs a lock 6 times.
void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
int64_t video_playout_ntp_ms;
int64_t sync_offset_ms;
double estimated_freq_khz;
// TODO(webrtc:11489): GetStreamSyncOffsetInMs grabs three locks. One inside
// the function itself, another in GetChannel() and a third in
// TODO(tommi): GetStreamSyncOffsetInMs grabs three locks. One inside the
// function itself, another in GetChannel() and a third in
// GetPlayoutTimestamp. Seems excessive. Anyhow, I'm assuming the function
// succeeds most of the time, which leads to grabbing a fourth lock.
if (rtp_stream_sync_.GetStreamSyncOffsetInMs(
video_frame.timestamp(), video_frame.render_time_ms(),
&video_playout_ntp_ms, &sync_offset_ms, &estimated_freq_khz)) {
// TODO(webrtc:11489): OnSyncOffsetUpdated grabs a lock.
// TODO(tommi): OnSyncOffsetUpdated grabs a lock.
stats_proxy_.OnSyncOffsetUpdated(video_playout_ntp_ms, sync_offset_ms,
estimated_freq_khz);
}
@ -537,7 +526,7 @@ void VideoReceiveStream::OnFrame(const VideoFrame& video_frame) {
config_.renderer->OnFrame(video_frame);
// TODO(webrtc:11489): OnRenderFrame grabs a lock too.
// TODO(tommi): OnRenderFrame grabs a lock too.
stats_proxy_.OnRenderedFrame(video_frame);
}
@ -574,9 +563,6 @@ void VideoReceiveStream::OnCompleteFrame(
}
last_complete_frame_time_ms_ = time_now_ms;
// TODO(webrtc:11489): We grab the playout_delay_lock_ lock potentially twice.
// Consider checking both min/max and posting to worker if there's a change.
// If we always update playout delays on the worker, we don't need a lock.
const PlayoutDelay& playout_delay = frame->EncodedImage().playout_delay_;
if (playout_delay.min_ms >= 0) {
rtc::CritScope cs(&playout_delay_lock_);
@ -632,7 +618,6 @@ void VideoReceiveStream::SetEstimatedPlayoutNtpTimestampMs(
void VideoReceiveStream::SetMinimumPlayoutDelay(int delay_ms) {
RTC_DCHECK_RUN_ON(&module_process_sequence_checker_);
// TODO(webrtc:11489): Consider posting to worker.
rtc::CritScope cs(&playout_delay_lock_);
syncable_minimum_playout_delay_ms_ = delay_ms;
UpdatePlayoutDelays();
@ -667,7 +652,6 @@ void VideoReceiveStream::StartNextDecode() {
void VideoReceiveStream::HandleEncodedFrame(
std::unique_ptr<EncodedFrame> frame) {
// Running on |decode_queue_|.
int64_t now_ms = clock_->TimeInMilliseconds();
// Current OnPreDecode only cares about QP for VP8.
@ -722,7 +706,6 @@ void VideoReceiveStream::HandleKeyFrameGeneration(
}
void VideoReceiveStream::HandleFrameBufferTimeout() {
// Running on |decode_queue_|.
int64_t now_ms = clock_->TimeInMilliseconds();
absl::optional<int64_t> last_packet_ms =
rtp_video_stream_receiver_.LastReceivedPacketMs();

View File

@ -45,12 +45,6 @@ class VCMTiming;
namespace internal {
// Utility function that fetches the TQ that's active in the current context
// or the active rtc::Thread if no TQ is active. This is necessary at the moment
// for VideoReceiveStream and downstream classes as tests and production don't
// consistently follow the same procedures.
TaskQueueBase* GetCurrentTaskQueue();
class VideoReceiveStream : public webrtc::VideoReceiveStream,
public rtc::VideoSinkInterface<VideoFrame>,
public NackSender,
@ -167,7 +161,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
const VideoReceiveStream::Config config_;
const int num_cpu_cores_;
ProcessThread* const process_thread_;
TaskQueueBase* const worker_thread_;
Clock* const clock_;
CallStats* const call_stats_;