Reland "Running FrameBuffer on task queue."
This is a reland of 13943b7b7f6d00568912b9969db2c7871d18e21f Original change's description: > Running FrameBuffer on task queue. > > This prepares for running WebRTC in simulated time where event::Wait > based timing doesn't work. > > Bug: webrtc:10365 > Change-Id: Ia0f9b1cc8e3c8c27a38e45b40487050a4699d8cf > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/129962 > Reviewed-by: Philip Eliasson <philipel@webrtc.org> > Reviewed-by: Erik Språng <sprang@webrtc.org> > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#27422} Bug: webrtc:10365 Change-Id: I412d3e0fe06c6dd57cdb42974f09e03f3a6ad038 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131124 Reviewed-by: Philip Eliasson <philipel@webrtc.org> Reviewed-by: Erik Språng <sprang@webrtc.org> Commit-Queue: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#27572}
This commit is contained in:

committed by
Commit Bot

parent
70c961f965
commit
11d0d7b945
@ -151,6 +151,7 @@ rtc_static_library("video_coding") {
|
||||
"..:module_api_public",
|
||||
"../../api:fec_controller_api",
|
||||
"../../api:rtp_headers",
|
||||
"../../api/task_queue:global_task_queue_factory",
|
||||
"../../api/units:data_rate",
|
||||
"../../api/video:builtin_video_bitrate_allocator_factory",
|
||||
"../../api/video:encoded_frame",
|
||||
@ -170,6 +171,7 @@ rtc_static_library("video_coding") {
|
||||
"../../rtc_base/experiments:rtt_mult_experiment",
|
||||
"../../rtc_base/synchronization:sequence_checker",
|
||||
"../../rtc_base/system:fallthrough",
|
||||
"../../rtc_base/task_utils:repeating_task",
|
||||
"../../rtc_base/third_party/base64",
|
||||
"../../rtc_base/time:timestamp_extrapolator",
|
||||
"../../system_wrappers",
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/memory/memory.h"
|
||||
#include "api/video/encoded_image.h"
|
||||
#include "api/video/video_timing.h"
|
||||
#include "modules/video_coding/include/video_coding_defines.h"
|
||||
@ -53,6 +54,7 @@ FrameBuffer::FrameBuffer(Clock* clock,
|
||||
VCMReceiveStatisticsCallback* stats_callback)
|
||||
: decoded_frames_history_(kMaxFramesHistory),
|
||||
clock_(clock),
|
||||
callback_queue_(nullptr),
|
||||
jitter_estimator_(jitter_estimator),
|
||||
timing_(timing),
|
||||
inter_frame_delay_(clock_->TimeInMilliseconds()),
|
||||
@ -65,6 +67,55 @@ FrameBuffer::FrameBuffer(Clock* clock,
|
||||
|
||||
FrameBuffer::~FrameBuffer() {}
|
||||
|
||||
void FrameBuffer::NextFrame(
|
||||
int64_t max_wait_time_ms,
|
||||
bool keyframe_required,
|
||||
rtc::TaskQueue* callback_queue,
|
||||
std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler) {
|
||||
RTC_DCHECK_RUN_ON(callback_queue);
|
||||
TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame");
|
||||
int64_t latest_return_time_ms =
|
||||
clock_->TimeInMilliseconds() + max_wait_time_ms;
|
||||
rtc::CritScope lock(&crit_);
|
||||
if (stopped_) {
|
||||
return;
|
||||
}
|
||||
latest_return_time_ms_ = latest_return_time_ms;
|
||||
keyframe_required_ = keyframe_required;
|
||||
frame_handler_ = handler;
|
||||
callback_queue_ = callback_queue;
|
||||
StartWaitForNextFrameOnQueue();
|
||||
}
|
||||
|
||||
void FrameBuffer::StartWaitForNextFrameOnQueue() {
|
||||
RTC_DCHECK(callback_queue_);
|
||||
RTC_DCHECK(!callback_task_.Running());
|
||||
int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
|
||||
callback_task_ = RepeatingTaskHandle::DelayedStart(
|
||||
callback_queue_->Get(), TimeDelta::ms(wait_ms), [this] {
|
||||
// If this task has not been cancelled, we did not get any new frames
|
||||
// while waiting. Continue with frame delivery.
|
||||
rtc::CritScope lock(&crit_);
|
||||
if (!frames_to_decode_.empty()) {
|
||||
// We have frames, deliver!
|
||||
frame_handler_(absl::WrapUnique(GetNextFrame()), kFrameFound);
|
||||
CancelCallback();
|
||||
return TimeDelta::Zero(); // Ignored.
|
||||
} else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) {
|
||||
// We have timed out, signal this and stop repeating.
|
||||
frame_handler_(nullptr, kTimeout);
|
||||
CancelCallback();
|
||||
return TimeDelta::Zero(); // Ignored.
|
||||
} else {
|
||||
// If there's no frames to decode and there is still time left, it
|
||||
// means that the frame buffer was cleared between creation and
|
||||
// execution of this task. Continue waiting for the remaining time.
|
||||
int64_t wait_ms = FindNextFrame(clock_->TimeInMilliseconds());
|
||||
return TimeDelta::ms(wait_ms);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
FrameBuffer::ReturnReason FrameBuffer::NextFrame(
|
||||
int64_t max_wait_time_ms,
|
||||
std::unique_ptr<EncodedFrame>* frame_out,
|
||||
@ -313,6 +364,7 @@ void FrameBuffer::Stop() {
|
||||
rtc::CritScope lock(&crit_);
|
||||
stopped_ = true;
|
||||
new_continuous_frame_event_.Set();
|
||||
CancelCallback();
|
||||
}
|
||||
|
||||
void FrameBuffer::Clear() {
|
||||
@ -342,6 +394,12 @@ bool FrameBuffer::ValidReferences(const EncodedFrame& frame) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
void FrameBuffer::CancelCallback() {
|
||||
frame_handler_ = {};
|
||||
callback_task_.Stop();
|
||||
callback_queue_ = nullptr;
|
||||
}
|
||||
|
||||
bool FrameBuffer::IsCompleteSuperFrame(const EncodedFrame& frame) {
|
||||
if (frame.inter_layer_predicted) {
|
||||
// Check that all previous spatial layers are already inserted.
|
||||
@ -487,9 +545,19 @@ int64_t FrameBuffer::InsertFrame(std::unique_ptr<EncodedFrame> frame) {
|
||||
last_continuous_picture_id = last_continuous_frame_->picture_id;
|
||||
|
||||
// Since we now have new continuous frames there might be a better frame
|
||||
// to return from NextFrame. Signal that thread so that it again can choose
|
||||
// which frame to return.
|
||||
// to return from NextFrame.
|
||||
new_continuous_frame_event_.Set();
|
||||
|
||||
if (callback_queue_) {
|
||||
callback_queue_->PostTask([this] {
|
||||
rtc::CritScope lock(&crit_);
|
||||
if (!callback_task_.Running())
|
||||
return;
|
||||
RTC_CHECK(frame_handler_);
|
||||
callback_task_.Stop();
|
||||
StartWaitForNextFrameOnQueue();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return last_continuous_picture_id;
|
||||
|
@ -27,6 +27,8 @@
|
||||
#include "rtc_base/event.h"
|
||||
#include "rtc_base/experiments/rtt_mult_experiment.h"
|
||||
#include "rtc_base/numerics/sequence_number_util.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "rtc_base/task_utils/repeating_task.h"
|
||||
#include "rtc_base/thread_annotations.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -64,6 +66,11 @@ class FrameBuffer {
|
||||
ReturnReason NextFrame(int64_t max_wait_time_ms,
|
||||
std::unique_ptr<EncodedFrame>* frame_out,
|
||||
bool keyframe_required = false);
|
||||
void NextFrame(
|
||||
int64_t max_wait_time_ms,
|
||||
bool keyframe_required,
|
||||
rtc::TaskQueue* callback_queue,
|
||||
std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)> handler);
|
||||
|
||||
// Tells the FrameBuffer which protection mode that is in use. Affects
|
||||
// the frame timing.
|
||||
@ -121,6 +128,9 @@ class FrameBuffer {
|
||||
int64_t FindNextFrame(int64_t now_ms) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
|
||||
EncodedFrame* GetNextFrame() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
|
||||
|
||||
void StartWaitForNextFrameOnQueue() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
|
||||
void CancelCallback() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
|
||||
|
||||
// Update all directly dependent and indirectly dependent frames and mark
|
||||
// them as continuous if all their references has been fulfilled.
|
||||
void PropagateContinuity(FrameMap::iterator start)
|
||||
@ -163,6 +173,11 @@ class FrameBuffer {
|
||||
|
||||
rtc::CriticalSection crit_;
|
||||
Clock* const clock_;
|
||||
|
||||
rtc::TaskQueue* callback_queue_ RTC_GUARDED_BY(crit_);
|
||||
RepeatingTaskHandle callback_task_ RTC_GUARDED_BY(crit_);
|
||||
std::function<void(std::unique_ptr<EncodedFrame>, ReturnReason)>
|
||||
frame_handler_ RTC_GUARDED_BY(crit_);
|
||||
int64_t latest_return_time_ms_ RTC_GUARDED_BY(crit_);
|
||||
bool keyframe_required_ RTC_GUARDED_BY(crit_);
|
||||
|
||||
|
@ -185,6 +185,8 @@ VideoReceiveStream::VideoReceiveStream(
|
||||
num_cpu_cores_(num_cpu_cores),
|
||||
process_thread_(process_thread),
|
||||
clock_(clock),
|
||||
use_task_queue_(
|
||||
!field_trial::IsDisabled("WebRTC-Video-DecodeOnTaskQueue")),
|
||||
decode_thread_(&DecodeThreadFunction,
|
||||
this,
|
||||
"DecodingThread",
|
||||
@ -213,7 +215,10 @@ VideoReceiveStream::VideoReceiveStream(
|
||||
.value_or(kMaxWaitForKeyFrameMs)),
|
||||
max_wait_for_frame_ms_(KeyframeIntervalSettings::ParseFromFieldTrials()
|
||||
.MaxWaitForFrameMs()
|
||||
.value_or(kMaxWaitForFrameMs)) {
|
||||
.value_or(kMaxWaitForFrameMs)),
|
||||
decode_queue_(task_queue_factory_->CreateTaskQueue(
|
||||
"DecodingQueue",
|
||||
TaskQueueFactory::Priority::HIGH)) {
|
||||
RTC_LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString();
|
||||
|
||||
RTC_DCHECK(config_.renderer);
|
||||
@ -309,7 +314,7 @@ void VideoReceiveStream::SetSync(Syncable* audio_syncable) {
|
||||
void VideoReceiveStream::Start() {
|
||||
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
|
||||
|
||||
if (decode_thread_.IsRunning()) {
|
||||
if (decoder_running_) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -388,7 +393,16 @@ void VideoReceiveStream::Start() {
|
||||
// Start the decode thread
|
||||
video_receiver_.DecoderThreadStarting();
|
||||
stats_proxy_.DecoderThreadStarting();
|
||||
decode_thread_.Start();
|
||||
if (!use_task_queue_) {
|
||||
decode_thread_.Start();
|
||||
} else {
|
||||
decode_queue_.PostTask([this] {
|
||||
RTC_DCHECK_RUN_ON(&decode_queue_);
|
||||
decoder_stopped_ = false;
|
||||
StartNextDecode();
|
||||
});
|
||||
}
|
||||
decoder_running_ = true;
|
||||
rtp_video_stream_receiver_.StartReceive();
|
||||
}
|
||||
|
||||
@ -399,16 +413,31 @@ void VideoReceiveStream::Stop() {
|
||||
stats_proxy_.OnUniqueFramesCounted(
|
||||
rtp_video_stream_receiver_.GetUniqueFramesSeen());
|
||||
|
||||
frame_buffer_->Stop();
|
||||
if (!use_task_queue_) {
|
||||
frame_buffer_->Stop();
|
||||
} else {
|
||||
decode_queue_.PostTask([this] { frame_buffer_->Stop(); });
|
||||
}
|
||||
call_stats_->DeregisterStatsObserver(this);
|
||||
|
||||
if (decode_thread_.IsRunning()) {
|
||||
if (decoder_running_) {
|
||||
// TriggerDecoderShutdown will release any waiting decoder thread and make
|
||||
// it stop immediately, instead of waiting for a timeout. Needs to be called
|
||||
// before joining the decoder thread.
|
||||
video_receiver_.TriggerDecoderShutdown();
|
||||
|
||||
decode_thread_.Stop();
|
||||
if (!use_task_queue_) {
|
||||
decode_thread_.Stop();
|
||||
} else {
|
||||
rtc::Event done;
|
||||
decode_queue_.PostTask([this, &done] {
|
||||
RTC_DCHECK_RUN_ON(&decode_queue_);
|
||||
decoder_stopped_ = true;
|
||||
done.Set();
|
||||
});
|
||||
done.Wait(rtc::Event::kForever);
|
||||
}
|
||||
decoder_running_ = false;
|
||||
video_receiver_.DecoderThreadStopped();
|
||||
stats_proxy_.DecoderThreadStopped();
|
||||
// Deregister external decoders so they are no longer running during
|
||||
@ -572,6 +601,38 @@ int64_t VideoReceiveStream::GetWaitMs() const {
|
||||
return keyframe_required_ ? max_wait_for_keyframe_ms_
|
||||
: max_wait_for_frame_ms_;
|
||||
}
|
||||
|
||||
void VideoReceiveStream::StartNextDecode() {
|
||||
RTC_DCHECK(use_task_queue_);
|
||||
TRACE_EVENT0("webrtc", "VideoReceiveStream::StartNextDecode");
|
||||
|
||||
struct DecodeTask {
|
||||
void operator()() {
|
||||
RTC_DCHECK_RUN_ON(&stream->decode_queue_);
|
||||
if (stream->decoder_stopped_)
|
||||
return;
|
||||
if (frame) {
|
||||
stream->HandleEncodedFrame(std::move(frame));
|
||||
} else {
|
||||
stream->HandleFrameBufferTimeout();
|
||||
}
|
||||
stream->StartNextDecode();
|
||||
}
|
||||
VideoReceiveStream* stream;
|
||||
std::unique_ptr<EncodedFrame> frame;
|
||||
};
|
||||
|
||||
// TODO(philipel): Call NextFrame with |keyframe_required| argument set when
|
||||
// downstream project has been fixed.
|
||||
frame_buffer_->NextFrame(
|
||||
GetWaitMs(), /*keyframe_required*/ false, &decode_queue_,
|
||||
[this](std::unique_ptr<EncodedFrame> frame, ReturnReason res) {
|
||||
RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout);
|
||||
RTC_DCHECK_EQ(frame != nullptr, res == ReturnReason::kFrameFound);
|
||||
decode_queue_.PostTask(DecodeTask{this, std::move(frame)});
|
||||
});
|
||||
}
|
||||
|
||||
void VideoReceiveStream::DecodeThreadFunction(void* ptr) {
|
||||
ScopedRegisterThreadForDebugging thread_dbg(RTC_FROM_HERE);
|
||||
while (static_cast<VideoReceiveStream*>(ptr)->Decode()) {
|
||||
@ -579,6 +640,7 @@ void VideoReceiveStream::DecodeThreadFunction(void* ptr) {
|
||||
}
|
||||
|
||||
bool VideoReceiveStream::Decode() {
|
||||
RTC_DCHECK(!use_task_queue_);
|
||||
TRACE_EVENT0("webrtc", "VideoReceiveStream::Decode");
|
||||
|
||||
std::unique_ptr<video_coding::EncodedFrame> frame;
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "modules/video_coding/frame_buffer2.h"
|
||||
#include "modules/video_coding/video_coding_impl.h"
|
||||
#include "rtc_base/synchronization/sequence_checker.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
#include "video/receive_statistics_proxy.h"
|
||||
#include "video/rtp_streams_synchronizer.h"
|
||||
@ -133,6 +134,7 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
|
||||
|
||||
private:
|
||||
int64_t GetWaitMs() const;
|
||||
void StartNextDecode() RTC_RUN_ON(decode_queue_);
|
||||
static void DecodeThreadFunction(void* ptr);
|
||||
bool Decode();
|
||||
void HandleEncodedFrame(std::unique_ptr<video_coding::EncodedFrame> frame);
|
||||
@ -153,10 +155,15 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
|
||||
ProcessThread* const process_thread_;
|
||||
Clock* const clock_;
|
||||
|
||||
const bool use_task_queue_;
|
||||
|
||||
rtc::PlatformThread decode_thread_;
|
||||
|
||||
CallStats* const call_stats_;
|
||||
|
||||
bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false;
|
||||
bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true;
|
||||
|
||||
ReceiveStatisticsProxy stats_proxy_;
|
||||
// Shared by media and rtx stream receivers, since the latter has no RtpRtcp
|
||||
// module of its own.
|
||||
@ -211,6 +218,9 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream,
|
||||
|
||||
// Maximum delay as decided by the RTP playout delay extension.
|
||||
int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1;
|
||||
|
||||
// Defined last so they are destroyed before all other members.
|
||||
rtc::TaskQueue decode_queue_;
|
||||
};
|
||||
} // namespace internal
|
||||
} // namespace webrtc
|
||||
|
Reference in New Issue
Block a user