From c680c4a80786d9e782ee967848cceb74d85d6738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Wed, 3 Apr 2019 10:27:36 +0000 Subject: [PATCH] Revert "Running FrameBuffer on task queue." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 13943b7b7f6d00568912b9969db2c7871d18e21f. Reason for revert: Breaks chromium import bots: https://ci.chromium.org/p/chromium/builders/webrtc.fyi/WebRTC%20Chromium%20FYI%20Android%20Tests%20%28dbg%29%20%28K%20Nexus5%29 First failure: https://ci.chromium.org/p/chromium/builders/webrtc.fyi/WebRTC%20Chromium%20FYI%20Android%20Tests%20%28dbg%29%20%28K%20Nexus5%29/2794 Original change's description: > Running FrameBuffer on task queue. > > This prepares for running WebRTC in simulated time where event::Wait > based timing doesn't work. > > Bug: webrtc:10365 > Change-Id: Ia0f9b1cc8e3c8c27a38e45b40487050a4699d8cf > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/129962 > Reviewed-by: Philip Eliasson > Reviewed-by: Erik Språng > Commit-Queue: Sebastian Jansson > Cr-Commit-Position: refs/heads/master@{#27422} TBR=sprang@webrtc.org,philipel@webrtc.org,srte@webrtc.org Change-Id: I198a91ec1707cc8752a7fe55caf0f172e1b8e60a No-Presubmit: true No-Tree-Checks: true No-Try: true Bug: webrtc:10365 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131120 Reviewed-by: Henrik Boström Commit-Queue: Henrik Boström Cr-Commit-Position: refs/heads/master@{#27436} --- modules/video_coding/BUILD.gn | 2 - modules/video_coding/frame_buffer2.cc | 488 ++++++++++---------------- modules/video_coding/frame_buffer2.h | 36 +- video/video_receive_stream.cc | 228 ++++-------- video/video_receive_stream.h | 16 +- 5 files changed, 253 insertions(+), 517 deletions(-) diff --git a/modules/video_coding/BUILD.gn b/modules/video_coding/BUILD.gn index 5f366a6188..cdaff5d6ab 100644 --- a/modules/video_coding/BUILD.gn +++ b/modules/video_coding/BUILD.gn @@ -151,7 +151,6 @@ rtc_static_library("video_coding") { "..:module_api_public", "../../api:fec_controller_api", "../../api:rtp_headers", - "../../api/task_queue:global_task_queue_factory", "../../api/units:data_rate", "../../api/video:builtin_video_bitrate_allocator_factory", "../../api/video:encoded_frame", @@ -171,7 +170,6 @@ rtc_static_library("video_coding") { "../../rtc_base/experiments:jitter_upper_bound_experiment", "../../rtc_base/experiments:rtt_mult_experiment", "../../rtc_base/system:fallthrough", - "../../rtc_base/task_utils:repeating_task", "../../rtc_base/third_party/base64", "../../rtc_base/time:timestamp_extrapolator", "../../system_wrappers", diff --git a/modules/video_coding/frame_buffer2.cc b/modules/video_coding/frame_buffer2.cc index a65401901d..ecc0e176f5 100644 --- a/modules/video_coding/frame_buffer2.cc +++ b/modules/video_coding/frame_buffer2.cc @@ -17,8 +17,6 @@ #include #include -#include "absl/memory/memory.h" -#include "api/task_queue/global_task_queue_factory.h" #include "api/video/encoded_image.h" #include "api/video/video_timing.h" #include "modules/video_coding/include/video_coding_defines.h" @@ -47,30 +45,14 @@ constexpr int kMaxFramesHistory = 1 << 13; constexpr int kMaxAllowedFrameDelayMs = 5; constexpr int64_t kLogNonDecodedIntervalMs = 5000; - -std::unique_ptr CreateQueue( - TaskQueueFactory* task_queue_factory) { - if (!task_queue_factory) - task_queue_factory = &GlobalTaskQueueFactory(); - return task_queue_factory->CreateTaskQueue("FrameBuffer", - TaskQueueFactory::Priority::HIGH); -} } // namespace FrameBuffer::FrameBuffer(Clock* clock, - VCMJitterEstimator* jitter_estimator, - VCMTiming* timing, - VCMReceiveStatisticsCallback* stats_proxy) - : FrameBuffer(clock, nullptr, jitter_estimator, timing, stats_proxy) {} - -FrameBuffer::FrameBuffer(Clock* clock, - TaskQueueFactory* task_queue_factory, VCMJitterEstimator* jitter_estimator, VCMTiming* timing, VCMReceiveStatisticsCallback* stats_callback) : decoded_frames_history_(kMaxFramesHistory), clock_(clock), - use_task_queue_(task_queue_factory != nullptr), jitter_estimator_(jitter_estimator), timing_(timing), inter_frame_delay_(clock_->TimeInMilliseconds()), @@ -79,69 +61,14 @@ FrameBuffer::FrameBuffer(Clock* clock, stats_callback_(stats_callback), last_log_non_decoded_ms_(-kLogNonDecodedIntervalMs), add_rtt_to_playout_delay_( - webrtc::field_trial::IsEnabled("WebRTC-AddRttToPlayoutDelay")), - task_queue_(CreateQueue(task_queue_factory)) {} + webrtc::field_trial::IsEnabled("WebRTC-AddRttToPlayoutDelay")) {} FrameBuffer::~FrameBuffer() {} -void FrameBuffer::NextFrame( - int64_t max_wait_time_ms, - bool keyframe_required, - std::function, ReturnReason)> handler) { - RTC_DCHECK(use_task_queue_); - TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame"); - int64_t latest_return_time_ms = - clock_->TimeInMilliseconds() + max_wait_time_ms; - task_queue_.PostTask([=] { - RTC_DCHECK_RUN_ON(&task_queue_); - rtc::CritScope lock(&crit_); - if (stopped_) { - return; - } - latest_return_time_ms_ = latest_return_time_ms; - keyframe_required_ = keyframe_required; - frame_handler_ = handler; - NextFrameOnQueue(); - }); -} - -void FrameBuffer::NextFrameOnQueue() { - RTC_DCHECK(use_task_queue_); - RTC_DCHECK(!callback_task_.Running()); - int64_t wait_ms = UpdateFramesToDecode(clock_->TimeInMilliseconds()); - callback_task_ = RepeatingTaskHandle::DelayedStart( - task_queue_.Get(), TimeDelta::ms(wait_ms), [this] { - // If this task has not been cancelled, we did not get any new frames - // while waiting. Continue with frame delivery. - RTC_DCHECK_RUN_ON(&task_queue_); - rtc::CritScope lock(&crit_); - if (!frames_to_decode_.empty()) { - // We have frames, deliver! - frame_handler_(absl::WrapUnique(GetFrameToDecode()), kFrameFound); - frame_handler_ = {}; - callback_task_.Stop(); - return TimeDelta::Zero(); // Ignored. - } else if (clock_->TimeInMilliseconds() >= latest_return_time_ms_) { - // We have timed out, signal this and stop repeating. - frame_handler_(nullptr, kTimeout); - frame_handler_ = {}; - callback_task_.Stop(); - return TimeDelta::Zero(); // Ignored. - } else { - // If there's no frames to decode and there is still time left, it - // means that the frame buffer was cleared between creation and - // execution of this task. Continue waiting for the remaining time. - int64_t wait_ms = UpdateFramesToDecode(clock_->TimeInMilliseconds()); - return TimeDelta::ms(wait_ms); - } - }); -} - FrameBuffer::ReturnReason FrameBuffer::NextFrame( int64_t max_wait_time_ms, std::unique_ptr* frame_out, bool keyframe_required) { - RTC_DCHECK(!use_task_queue_); TRACE_EVENT0("webrtc", "FrameBuffer::NextFrame"); int64_t latest_return_time_ms = clock_->TimeInMilliseconds() + max_wait_time_ms; @@ -156,25 +83,183 @@ FrameBuffer::ReturnReason FrameBuffer::NextFrame( if (stopped_) return kStopped; - // Need to hold |crit_| in order to access the members. therefore we + wait_ms = max_wait_time_ms; + + // Need to hold |crit_| in order to access frames_to_decode_. therefore we // set it here in the loop instead of outside the loop in order to not // acquire the lock unnecessarily. - keyframe_required_ = keyframe_required; - latest_return_time_ms_ = latest_return_time_ms; - wait_ms = UpdateFramesToDecode(now_ms); - } + frames_to_decode_.clear(); + + // |last_continuous_frame_| may be empty below, but nullopt is smaller + // than everything else and loop will immediately terminate as expected. + for (auto frame_it = frames_.begin(); + frame_it != frames_.end() && + frame_it->first <= last_continuous_frame_; + ++frame_it) { + if (!frame_it->second.continuous || + frame_it->second.num_missing_decodable > 0) { + continue; + } + + EncodedFrame* frame = frame_it->second.frame.get(); + + if (keyframe_required && !frame->is_keyframe()) + continue; + + auto last_decoded_frame_timestamp = + decoded_frames_history_.GetLastDecodedFrameTimestamp(); + + // TODO(https://bugs.webrtc.org/9974): consider removing this check + // as it may make a stream undecodable after a very long delay between + // frames. + if (last_decoded_frame_timestamp && + AheadOf(*last_decoded_frame_timestamp, frame->Timestamp())) { + continue; + } + + // Only ever return all parts of a superframe. Therefore skip this + // frame if it's not a beginning of a superframe. + if (frame->inter_layer_predicted) { + continue; + } + + // Gather all remaining frames for the same superframe. + std::vector current_superframe; + current_superframe.push_back(frame_it); + bool last_layer_completed = + frame_it->second.frame->is_last_spatial_layer; + FrameMap::iterator next_frame_it = frame_it; + while (true) { + ++next_frame_it; + if (next_frame_it == frames_.end() || + next_frame_it->first.picture_id != frame->id.picture_id || + !next_frame_it->second.continuous) { + break; + } + // Check if the next frame has some undecoded references other than + // the previous frame in the same superframe. + size_t num_allowed_undecoded_refs = + (next_frame_it->second.frame->inter_layer_predicted) ? 1 : 0; + if (next_frame_it->second.num_missing_decodable > + num_allowed_undecoded_refs) { + break; + } + // All frames in the superframe should have the same timestamp. + if (frame->Timestamp() != next_frame_it->second.frame->Timestamp()) { + RTC_LOG(LS_WARNING) + << "Frames in a single superframe have different" + " timestamps. Skipping undecodable superframe."; + break; + } + current_superframe.push_back(next_frame_it); + last_layer_completed = + next_frame_it->second.frame->is_last_spatial_layer; + } + // Check if the current superframe is complete. + // TODO(bugs.webrtc.org/10064): consider returning all available to + // decode frames even if the superframe is not complete yet. + if (!last_layer_completed) { + continue; + } + + frames_to_decode_ = std::move(current_superframe); + + if (frame->RenderTime() == -1) { + frame->SetRenderTime( + timing_->RenderTimeMs(frame->Timestamp(), now_ms)); + } + wait_ms = timing_->MaxWaitingTime(frame->RenderTime(), now_ms); + + // This will cause the frame buffer to prefer high framerate rather + // than high resolution in the case of the decoder not decoding fast + // enough and the stream has multiple spatial and temporal layers. + // For multiple temporal layers it may cause non-base layer frames to be + // skipped if they are late. + if (wait_ms < -kMaxAllowedFrameDelayMs) + continue; + + break; + } + } // rtc::Critscope lock(&crit_); + + wait_ms = std::min(wait_ms, latest_return_time_ms - now_ms); + wait_ms = std::max(wait_ms, 0); } while (new_continuous_frame_event_.Wait(wait_ms)); { rtc::CritScope lock(&crit_); + now_ms = clock_->TimeInMilliseconds(); + // TODO(ilnik): remove |frames_out| use frames_to_decode_ directly. + std::vector frames_out; if (!frames_to_decode_.empty()) { - frame_out->reset(GetFrameToDecode()); + bool superframe_delayed_by_retransmission = false; + size_t superframe_size = 0; + EncodedFrame* first_frame = frames_to_decode_[0]->second.frame.get(); + int64_t render_time_ms = first_frame->RenderTime(); + int64_t receive_time_ms = first_frame->ReceivedTime(); + // Gracefully handle bad RTP timestamps and render time issues. + if (HasBadRenderTiming(*first_frame, now_ms)) { + jitter_estimator_->Reset(); + timing_->Reset(); + render_time_ms = + timing_->RenderTimeMs(first_frame->Timestamp(), now_ms); + } + + for (FrameMap::iterator& frame_it : frames_to_decode_) { + RTC_DCHECK(frame_it != frames_.end()); + EncodedFrame* frame = frame_it->second.frame.release(); + + frame->SetRenderTime(render_time_ms); + + superframe_delayed_by_retransmission |= + frame->delayed_by_retransmission(); + receive_time_ms = std::max(receive_time_ms, frame->ReceivedTime()); + superframe_size += frame->size(); + + PropagateDecodability(frame_it->second); + decoded_frames_history_.InsertDecoded(frame_it->first, + frame->Timestamp()); + + // Remove decoded frame and all undecoded frames before it. + frames_.erase(frames_.begin(), ++frame_it); + + frames_out.push_back(frame); + } + + if (!superframe_delayed_by_retransmission) { + int64_t frame_delay; + + if (inter_frame_delay_.CalculateDelay(first_frame->Timestamp(), + &frame_delay, receive_time_ms)) { + jitter_estimator_->UpdateEstimate(frame_delay, superframe_size); + } + + float rtt_mult = protection_mode_ == kProtectionNackFEC ? 0.0 : 1.0; + if (RttMultExperiment::RttMultEnabled()) { + rtt_mult = RttMultExperiment::GetRttMultValue(); + } + timing_->SetJitterDelay(jitter_estimator_->GetJitterEstimate(rtt_mult)); + timing_->UpdateCurrentDelay(render_time_ms, now_ms); + } else { + if (RttMultExperiment::RttMultEnabled() || add_rtt_to_playout_delay_) + jitter_estimator_->FrameNacked(); + } + + UpdateJitterDelay(); + UpdateTimingFrameInfo(); + } + if (!frames_out.empty()) { + if (frames_out.size() == 1) { + frame_out->reset(frames_out[0]); + } else { + frame_out->reset(CombineAndDeleteFrames(frames_out)); + } return kFrameFound; } - } + } // rtc::Critscope lock(&crit_) - if (latest_return_time_ms - clock_->TimeInMilliseconds() > 0) { + if (latest_return_time_ms - now_ms > 0) { // If |next_frame_it_ == frames_.end()| and there is still time left, it // means that the frame buffer was cleared as the thread in this function // was waiting to acquire |crit_| in order to return. Wait for the @@ -184,166 +269,6 @@ FrameBuffer::ReturnReason FrameBuffer::NextFrame( return kTimeout; } -int64_t FrameBuffer::UpdateFramesToDecode(int64_t now_ms) { - int64_t wait_ms = latest_return_time_ms_ - now_ms; - frames_to_decode_.clear(); - - // |last_continuous_frame_| may be empty below, but nullopt is smaller - // than everything else and loop will immediately terminate as expected. - for (auto frame_it = frames_.begin(); - frame_it != frames_.end() && frame_it->first <= last_continuous_frame_; - ++frame_it) { - if (!frame_it->second.continuous || - frame_it->second.num_missing_decodable > 0) { - continue; - } - - EncodedFrame* frame = frame_it->second.frame.get(); - - if (keyframe_required_ && !frame->is_keyframe()) - continue; - - auto last_decoded_frame_timestamp = - decoded_frames_history_.GetLastDecodedFrameTimestamp(); - - // TODO(https://bugs.webrtc.org/9974): consider removing this check - // as it may make a stream undecodable after a very long delay between - // frames. - if (last_decoded_frame_timestamp && - AheadOf(*last_decoded_frame_timestamp, frame->Timestamp())) { - continue; - } - - // Only ever return all parts of a superframe. Therefore skip this - // frame if it's not a beginning of a superframe. - if (frame->inter_layer_predicted) { - continue; - } - - // Gather all remaining frames for the same superframe. - std::vector current_superframe; - current_superframe.push_back(frame_it); - bool last_layer_completed = frame_it->second.frame->is_last_spatial_layer; - FrameMap::iterator next_frame_it = frame_it; - while (true) { - ++next_frame_it; - if (next_frame_it == frames_.end() || - next_frame_it->first.picture_id != frame->id.picture_id || - !next_frame_it->second.continuous) { - break; - } - // Check if the next frame has some undecoded references other than - // the previous frame in the same superframe. - size_t num_allowed_undecoded_refs = - (next_frame_it->second.frame->inter_layer_predicted) ? 1 : 0; - if (next_frame_it->second.num_missing_decodable > - num_allowed_undecoded_refs) { - break; - } - // All frames in the superframe should have the same timestamp. - if (frame->Timestamp() != next_frame_it->second.frame->Timestamp()) { - RTC_LOG(LS_WARNING) << "Frames in a single superframe have different" - " timestamps. Skipping undecodable superframe."; - break; - } - current_superframe.push_back(next_frame_it); - last_layer_completed = next_frame_it->second.frame->is_last_spatial_layer; - } - // Check if the current superframe is complete. - // TODO(bugs.webrtc.org/10064): consider returning all available to - // decode frames even if the superframe is not complete yet. - if (!last_layer_completed) { - continue; - } - - frames_to_decode_ = std::move(current_superframe); - - if (frame->RenderTime() == -1) { - frame->SetRenderTime(timing_->RenderTimeMs(frame->Timestamp(), now_ms)); - } - wait_ms = timing_->MaxWaitingTime(frame->RenderTime(), now_ms); - - // This will cause the frame buffer to prefer high framerate rather - // than high resolution in the case of the decoder not decoding fast - // enough and the stream has multiple spatial and temporal layers. - // For multiple temporal layers it may cause non-base layer frames to be - // skipped if they are late. - if (wait_ms < -kMaxAllowedFrameDelayMs) - continue; - - break; - } - wait_ms = std::min(wait_ms, latest_return_time_ms_ - now_ms); - wait_ms = std::max(wait_ms, 0); - return wait_ms; -} - -EncodedFrame* FrameBuffer::GetFrameToDecode() { - int64_t now_ms = clock_->TimeInMilliseconds(); - // TODO(ilnik): remove |frames_out| use frames_to_decode_ directly. - std::vector frames_out; - - RTC_DCHECK(!frames_to_decode_.empty()); - bool superframe_delayed_by_retransmission = false; - size_t superframe_size = 0; - EncodedFrame* first_frame = frames_to_decode_[0]->second.frame.get(); - int64_t render_time_ms = first_frame->RenderTime(); - int64_t receive_time_ms = first_frame->ReceivedTime(); - // Gracefully handle bad RTP timestamps and render time issues. - if (HasBadRenderTiming(*first_frame, now_ms)) { - jitter_estimator_->Reset(); - timing_->Reset(); - render_time_ms = timing_->RenderTimeMs(first_frame->Timestamp(), now_ms); - } - - for (FrameMap::iterator& frame_it : frames_to_decode_) { - RTC_DCHECK(frame_it != frames_.end()); - EncodedFrame* frame = frame_it->second.frame.release(); - - frame->SetRenderTime(render_time_ms); - - superframe_delayed_by_retransmission |= frame->delayed_by_retransmission(); - receive_time_ms = std::max(receive_time_ms, frame->ReceivedTime()); - superframe_size += frame->size(); - - PropagateDecodability(frame_it->second); - decoded_frames_history_.InsertDecoded(frame_it->first, frame->Timestamp()); - - // Remove decoded frame and all undecoded frames before it. - frames_.erase(frames_.begin(), ++frame_it); - - frames_out.push_back(frame); - } - - if (!superframe_delayed_by_retransmission) { - int64_t frame_delay; - - if (inter_frame_delay_.CalculateDelay(first_frame->Timestamp(), - &frame_delay, receive_time_ms)) { - jitter_estimator_->UpdateEstimate(frame_delay, superframe_size); - } - - float rtt_mult = protection_mode_ == kProtectionNackFEC ? 0.0 : 1.0; - if (RttMultExperiment::RttMultEnabled()) { - rtt_mult = RttMultExperiment::GetRttMultValue(); - } - timing_->SetJitterDelay(jitter_estimator_->GetJitterEstimate(rtt_mult)); - timing_->UpdateCurrentDelay(render_time_ms, now_ms); - } else { - if (RttMultExperiment::RttMultEnabled() || add_rtt_to_playout_delay_) - jitter_estimator_->FrameNacked(); - } - - UpdateJitterDelay(); - UpdateTimingFrameInfo(); - - if (frames_out.size() == 1) { - return frames_out[0]; - } else { - return CombineAndDeleteFrames(frames_out); - } -} - bool FrameBuffer::HasBadRenderTiming(const EncodedFrame& frame, int64_t now_ms) { // Assume that render timing errors are due to changes in the video stream. @@ -372,63 +297,33 @@ bool FrameBuffer::HasBadRenderTiming(const EncodedFrame& frame, return false; } -void FrameBuffer::SafePost(std::function func) { - if (!use_task_queue_) { - func(); - } else { - task_queue_.PostTask(func); - } -} void FrameBuffer::SetProtectionMode(VCMVideoProtection mode) { TRACE_EVENT0("webrtc", "FrameBuffer::SetProtectionMode"); - SafePost([this, mode] { - rtc::CritScope lock(&crit_); - protection_mode_ = mode; - }); + rtc::CritScope lock(&crit_); + protection_mode_ = mode; } void FrameBuffer::Start() { TRACE_EVENT0("webrtc", "FrameBuffer::Start"); - SafePost([this] { - rtc::CritScope lock(&crit_); - stopped_ = false; - }); + rtc::CritScope lock(&crit_); + stopped_ = false; } void FrameBuffer::Stop() { TRACE_EVENT0("webrtc", "FrameBuffer::Stop"); - if (!use_task_queue_) { - rtc::CritScope lock(&crit_); - stopped_ = true; - new_continuous_frame_event_.Set(); - } else { - rtc::Event done; - task_queue_.PostTask([this, &done] { - rtc::CritScope lock(&crit_); - stopped_ = true; - if (frame_handler_) { - RTC_DCHECK(callback_task_.Running()); - callback_task_.Stop(); - frame_handler_ = {}; - } - done.Set(); - }); - done.Wait(rtc::Event::kForever); - } + rtc::CritScope lock(&crit_); + stopped_ = true; + new_continuous_frame_event_.Set(); } void FrameBuffer::Clear() { - SafePost([this] { - rtc::CritScope lock(&crit_); - ClearFramesAndHistory(); - }); + rtc::CritScope lock(&crit_); + ClearFramesAndHistory(); } void FrameBuffer::UpdateRtt(int64_t rtt_ms) { - SafePost([this, rtt_ms] { - rtc::CritScope lock(&crit_); - jitter_estimator_->UpdateRtt(rtt_ms); - }); + rtc::CritScope lock(&crit_); + jitter_estimator_->UpdateRtt(rtt_ms); } bool FrameBuffer::ValidReferences(const EncodedFrame& frame) const { @@ -489,22 +384,6 @@ bool FrameBuffer::IsCompleteSuperFrame(const EncodedFrame& frame) { return true; } -void FrameBuffer::InsertFrame(std::unique_ptr frame, - std::function picture_id_handler) { - struct InsertFrameTask { - void operator()() { - RTC_DCHECK_RUN_ON(&frame_buffer->task_queue_); - int64_t last_continuous_pid = frame_buffer->InsertFrame(std::move(frame)); - picture_id_handler(last_continuous_pid); - } - FrameBuffer* frame_buffer; - std::unique_ptr frame; - std::function picture_id_handler; - }; - task_queue_.PostTask( - InsertFrameTask{this, std::move(frame), std::move(picture_id_handler)}); -} - int64_t FrameBuffer::InsertFrame(std::unique_ptr frame) { TRACE_EVENT0("webrtc", "FrameBuffer::InsertFrame"); RTC_DCHECK(frame); @@ -608,14 +487,9 @@ int64_t FrameBuffer::InsertFrame(std::unique_ptr frame) { last_continuous_picture_id = last_continuous_frame_->picture_id; // Since we now have new continuous frames there might be a better frame - // to return from NextFrame. - if (!use_task_queue_) { - new_continuous_frame_event_.Set(); - } else if (callback_task_.Running()) { - RTC_CHECK(frame_handler_); - callback_task_.Stop(); - NextFrameOnQueue(); - } + // to return from NextFrame. Signal that thread so that it again can choose + // which frame to return. + new_continuous_frame_event_.Set(); } return last_continuous_picture_id; diff --git a/modules/video_coding/frame_buffer2.h b/modules/video_coding/frame_buffer2.h index 7772167da8..fda496eda0 100644 --- a/modules/video_coding/frame_buffer2.h +++ b/modules/video_coding/frame_buffer2.h @@ -27,8 +27,6 @@ #include "rtc_base/event.h" #include "rtc_base/experiments/rtt_mult_experiment.h" #include "rtc_base/numerics/sequence_number_util.h" -#include "rtc_base/task_queue.h" -#include "rtc_base/task_utils/repeating_task.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -47,13 +45,7 @@ class FrameBuffer { FrameBuffer(Clock* clock, VCMJitterEstimator* jitter_estimator, VCMTiming* timing, - VCMReceiveStatisticsCallback* stats_callback); - - FrameBuffer(Clock* clock, - TaskQueueFactory* task_queue_factory, - VCMJitterEstimator* jitter_estimator, - VCMTiming* timing, - VCMReceiveStatisticsCallback* stats_callback); + VCMReceiveStatisticsCallback* stats_proxy); virtual ~FrameBuffer(); @@ -62,9 +54,6 @@ class FrameBuffer { // TODO(philipel): Return a VideoLayerFrameId and not only the picture id. int64_t InsertFrame(std::unique_ptr frame); - void InsertFrame(std::unique_ptr frame, - std::function picture_id_handler); - // Get the next frame for decoding. Will return at latest after // |max_wait_time_ms|. // - If a frame is available within |max_wait_time_ms| it will return @@ -75,10 +64,6 @@ class FrameBuffer { ReturnReason NextFrame(int64_t max_wait_time_ms, std::unique_ptr* frame_out, bool keyframe_required = false); - void NextFrame( - int64_t max_wait_time_ms, - bool keyframe_required, - std::function, ReturnReason)> handler); // Tells the FrameBuffer which protection mode that is in use. Affects // the frame timing. @@ -130,16 +115,9 @@ class FrameBuffer { using FrameMap = std::map; - void SafePost(std::function func); - // Check that the references of |frame| are valid. bool ValidReferences(const EncodedFrame& frame) const; - void NextFrameOnQueue() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); - int64_t UpdateFramesToDecode(int64_t now_ms) - RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); - EncodedFrame* GetFrameToDecode() RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_); - // Update all directly dependent and indirectly dependent frames and mark // them as continuous if all their references has been fulfilled. void PropagateContinuity(FrameMap::iterator start) @@ -180,19 +158,9 @@ class FrameBuffer { FrameMap frames_ RTC_GUARDED_BY(crit_); DecodedFramesHistory decoded_frames_history_ RTC_GUARDED_BY(crit_); - // TODO(srte): Remove this lock when always running on task queue. rtc::CriticalSection crit_; Clock* const clock_; - const bool use_task_queue_; - - RepeatingTaskHandle callback_task_ RTC_GUARDED_BY(crit_); - std::function, ReturnReason)> - frame_handler_ RTC_GUARDED_BY(crit_); - int64_t latest_return_time_ms_ RTC_GUARDED_BY(crit_); - bool keyframe_required_ RTC_GUARDED_BY(crit_); - rtc::Event new_continuous_frame_event_; - VCMJitterEstimator* const jitter_estimator_ RTC_GUARDED_BY(crit_); VCMTiming* const timing_ RTC_GUARDED_BY(crit_); VCMInterFrameDelay inter_frame_delay_ RTC_GUARDED_BY(crit_); @@ -206,8 +174,6 @@ class FrameBuffer { const bool add_rtt_to_playout_delay_; - // Defined last so it is destroyed before other members. - rtc::TaskQueue task_queue_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(FrameBuffer); }; diff --git a/video/video_receive_stream.cc b/video/video_receive_stream.cc index 1ded35f017..86b9a09d19 100644 --- a/video/video_receive_stream.cc +++ b/video/video_receive_stream.cc @@ -56,10 +56,6 @@ namespace webrtc { namespace { - -using video_coding::EncodedFrame; -using ReturnReason = video_coding::FrameBuffer::ReturnReason; - constexpr int kMinBaseMinimumDelayMs = 0; constexpr int kMaxBaseMinimumDelayMs = 10000; @@ -188,8 +184,6 @@ VideoReceiveStream::VideoReceiveStream( num_cpu_cores_(num_cpu_cores), process_thread_(process_thread), clock_(clock), - use_task_queue_( - !field_trial::IsDisabled("WebRTC-Video-DecodeOnTaskQueue")), decode_thread_(&DecodeThreadFunction, this, "DecodingThread", @@ -218,10 +212,7 @@ VideoReceiveStream::VideoReceiveStream( .value_or(kMaxWaitForKeyFrameMs)), max_wait_for_frame_ms_(KeyframeIntervalSettings::ParseFromFieldTrials() .MaxWaitForFrameMs() - .value_or(kMaxWaitForFrameMs)), - decode_queue_(task_queue_factory_->CreateTaskQueue( - "DecodingQueue", - TaskQueueFactory::Priority::HIGH)) { + .value_or(kMaxWaitForFrameMs)) { RTC_LOG(LS_INFO) << "VideoReceiveStream: " << config_.ToString(); RTC_DCHECK(config_.renderer); @@ -246,8 +237,7 @@ VideoReceiveStream::VideoReceiveStream( jitter_estimator_.reset(new VCMJitterEstimator(clock_)); frame_buffer_.reset(new video_coding::FrameBuffer( - clock_, use_task_queue_ ? task_queue_factory_ : nullptr, - jitter_estimator_.get(), timing_.get(), &stats_proxy_)); + clock_, jitter_estimator_.get(), timing_.get(), &stats_proxy_)); process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE); @@ -318,7 +308,7 @@ void VideoReceiveStream::SetSync(Syncable* audio_syncable) { void VideoReceiveStream::Start() { RTC_DCHECK_CALLED_SEQUENTIALLY(&worker_sequence_checker_); - if (decoder_running_) { + if (decode_thread_.IsRunning()) { return; } @@ -397,17 +387,7 @@ void VideoReceiveStream::Start() { // Start the decode thread video_receiver_.DecoderThreadStarting(); stats_proxy_.DecoderThreadStarting(); - if (!use_task_queue_) { - decode_thread_.Start(); - } else { - decode_queue_.PostTask([this] { - RTC_DCHECK_RUN_ON(&decode_queue_); - RTC_DCHECK(decoder_stopped_); - decoder_stopped_ = false; - StartNextDecode(); - }); - } - decoder_running_ = true; + decode_thread_.Start(); rtp_video_stream_receiver_.StartReceive(); } @@ -421,24 +401,13 @@ void VideoReceiveStream::Stop() { frame_buffer_->Stop(); call_stats_->DeregisterStatsObserver(this); - if (decoder_running_) { + if (decode_thread_.IsRunning()) { // TriggerDecoderShutdown will release any waiting decoder thread and make // it stop immediately, instead of waiting for a timeout. Needs to be called // before joining the decoder thread. video_receiver_.TriggerDecoderShutdown(); - if (!use_task_queue_) { - decode_thread_.Stop(); - } else { - rtc::Event done; - decode_queue_.PostTask([this, &done] { - RTC_DCHECK_RUN_ON(&decode_queue_); - decoder_stopped_ = true; - done.Set(); - }); - done.Wait(rtc::Event::kForever); - } - decoder_running_ = false; + decode_thread_.Stop(); video_receiver_.DecoderThreadStopped(); stats_proxy_.DecoderThreadStopped(); // Deregister external decoders so they are no longer running during @@ -542,17 +511,10 @@ void VideoReceiveStream::OnCompleteFrame( frame_maximum_playout_delay_ms_ = playout_delay.max_ms; UpdatePlayoutDelays(); } - if (!use_task_queue_) { - int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame)); - if (last_continuous_pid != -1) - rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid); - } else { - frame_buffer_->InsertFrame( - std::move(frame), [this](int64_t last_continuous_pid) { - if (last_continuous_pid != -1) - rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid); - }); - } + + int64_t last_continuous_pid = frame_buffer_->InsertFrame(std::move(frame)); + if (last_continuous_pid != -1) + rtp_video_stream_receiver_.FrameContinuous(last_continuous_pid); } void VideoReceiveStream::OnData(uint64_t channel_id, @@ -600,51 +562,6 @@ void VideoReceiveStream::SetMinimumPlayoutDelay(int delay_ms) { UpdatePlayoutDelays(); } -int64_t VideoReceiveStream::GetWaitMs() const { - return keyframe_required_ ? max_wait_for_keyframe_ms_ - : max_wait_for_frame_ms_; -} - -void VideoReceiveStream::StartNextDecode() { - RTC_DCHECK(use_task_queue_); - TRACE_EVENT0("webrtc", "VideoReceiveStream::StartNextDecode"); - - struct DecodeTask { - void operator()() { - RTC_DCHECK_RUN_ON(&stream->decode_queue_); - if (stream->decoder_stopped_) - return; - if (frame) { - stream->HandleEncodedFrame(std::move(frame)); - } else { - stream->HandleFrameBufferTimeout(); - } - } - VideoReceiveStream* stream; - std::unique_ptr frame; - }; - - // TODO(philipel): Call NextFrame with |keyframe_required| argument set when - // downstream project has been fixed. - frame_buffer_->NextFrame( - GetWaitMs(), /*keyframe_required*/ false, - [this](std::unique_ptr frame, ReturnReason res) { - RTC_DCHECK_EQ(frame == nullptr, res == ReturnReason::kTimeout); - RTC_DCHECK_EQ(frame != nullptr, res == ReturnReason::kFrameFound); - decode_queue_.PostTask(DecodeTask{this, std::move(frame)}); - // Start the next decode after a delay or when the previous decode is - // finished (as it will be blocked by the queue). - constexpr int kMinDecodeIntervalMs = 1; - decode_queue_.PostDelayedTask( - [this] { - RTC_DCHECK_RUN_ON(&decode_queue_); - if (!decoder_stopped_) - StartNextDecode(); - }, - kMinDecodeIntervalMs); - }); -} - void VideoReceiveStream::DecodeThreadFunction(void* ptr) { ScopedRegisterThreadForDebugging thread_dbg(RTC_FROM_HERE); while (static_cast(ptr)->Decode()) { @@ -652,85 +569,80 @@ void VideoReceiveStream::DecodeThreadFunction(void* ptr) { } bool VideoReceiveStream::Decode() { - RTC_DCHECK(!use_task_queue_); TRACE_EVENT0("webrtc", "VideoReceiveStream::Decode"); + const int wait_ms = + keyframe_required_ ? max_wait_for_keyframe_ms_ : max_wait_for_frame_ms_; std::unique_ptr frame; // TODO(philipel): Call NextFrame with |keyframe_required| argument when // downstream project has been fixed. video_coding::FrameBuffer::ReturnReason res = - frame_buffer_->NextFrame(GetWaitMs(), &frame); - if (res == ReturnReason::kStopped) { + frame_buffer_->NextFrame(wait_ms, &frame); + + if (res == video_coding::FrameBuffer::ReturnReason::kStopped) { return false; } + if (frame) { - RTC_DCHECK_EQ(res, ReturnReason::kFrameFound); - HandleEncodedFrame(std::move(frame)); + int64_t now_ms = clock_->TimeInMilliseconds(); + RTC_DCHECK_EQ(res, video_coding::FrameBuffer::ReturnReason::kFrameFound); + + // Current OnPreDecode only cares about QP for VP8. + int qp = -1; + if (frame->CodecSpecific()->codecType == kVideoCodecVP8) { + if (!vp8::GetQp(frame->data(), frame->size(), &qp)) { + RTC_LOG(LS_WARNING) << "Failed to extract QP from VP8 video frame"; + } + } + stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp); + + int decode_result = video_receiver_.Decode(frame.get()); + if (decode_result == WEBRTC_VIDEO_CODEC_OK || + decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) { + keyframe_required_ = false; + frame_decoded_ = true; + rtp_video_stream_receiver_.FrameDecoded(frame->id.picture_id); + + if (decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) + RequestKeyFrame(); + } else if (!frame_decoded_ || !keyframe_required_ || + (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ < + now_ms)) { + keyframe_required_ = true; + // TODO(philipel): Remove this keyframe request when downstream project + // has been fixed. + RequestKeyFrame(); + last_keyframe_request_ms_ = now_ms; + } } else { - RTC_DCHECK_EQ(res, ReturnReason::kTimeout); - HandleFrameBufferTimeout(); - } - return true; -} + RTC_DCHECK_EQ(res, video_coding::FrameBuffer::ReturnReason::kTimeout); + int64_t now_ms = clock_->TimeInMilliseconds(); + absl::optional last_packet_ms = + rtp_video_stream_receiver_.LastReceivedPacketMs(); + absl::optional last_keyframe_packet_ms = + rtp_video_stream_receiver_.LastReceivedKeyframePacketMs(); -void VideoReceiveStream::HandleEncodedFrame( - std::unique_ptr frame) { - int64_t now_ms = clock_->TimeInMilliseconds(); + // To avoid spamming keyframe requests for a stream that is not active we + // check if we have received a packet within the last 5 seconds. + bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000; + if (!stream_is_active) + stats_proxy_.OnStreamInactive(); - // Current OnPreDecode only cares about QP for VP8. - int qp = -1; - if (frame->CodecSpecific()->codecType == kVideoCodecVP8) { - if (!vp8::GetQp(frame->data(), frame->size(), &qp)) { - RTC_LOG(LS_WARNING) << "Failed to extract QP from VP8 video frame"; + // If we recently have been receiving packets belonging to a keyframe then + // we assume a keyframe is currently being received. + bool receiving_keyframe = + last_keyframe_packet_ms && + now_ms - *last_keyframe_packet_ms < max_wait_for_keyframe_ms_; + + if (stream_is_active && !receiving_keyframe && + (!config_.crypto_options.sframe.require_frame_encryption || + rtp_video_stream_receiver_.IsDecryptable())) { + RTC_LOG(LS_WARNING) << "No decodable frame in " << wait_ms + << " ms, requesting keyframe."; + RequestKeyFrame(); } } - stats_proxy_.OnPreDecode(frame->CodecSpecific()->codecType, qp); - - int decode_result = video_receiver_.Decode(frame.get()); - if (decode_result == WEBRTC_VIDEO_CODEC_OK || - decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) { - keyframe_required_ = false; - frame_decoded_ = true; - rtp_video_stream_receiver_.FrameDecoded(frame->id.picture_id); - - if (decode_result == WEBRTC_VIDEO_CODEC_OK_REQUEST_KEYFRAME) - RequestKeyFrame(); - } else if (!frame_decoded_ || !keyframe_required_ || - (last_keyframe_request_ms_ + max_wait_for_keyframe_ms_ < now_ms)) { - keyframe_required_ = true; - // TODO(philipel): Remove this keyframe request when downstream project - // has been fixed. - RequestKeyFrame(); - last_keyframe_request_ms_ = now_ms; - } -} - -void VideoReceiveStream::HandleFrameBufferTimeout() { - int64_t now_ms = clock_->TimeInMilliseconds(); - absl::optional last_packet_ms = - rtp_video_stream_receiver_.LastReceivedPacketMs(); - absl::optional last_keyframe_packet_ms = - rtp_video_stream_receiver_.LastReceivedKeyframePacketMs(); - - // To avoid spamming keyframe requests for a stream that is not active we - // check if we have received a packet within the last 5 seconds. - bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000; - if (!stream_is_active) - stats_proxy_.OnStreamInactive(); - - // If we recently have been receiving packets belonging to a keyframe then - // we assume a keyframe is currently being received. - bool receiving_keyframe = - last_keyframe_packet_ms && - now_ms - *last_keyframe_packet_ms < max_wait_for_keyframe_ms_; - - if (stream_is_active && !receiving_keyframe && - (!config_.crypto_options.sframe.require_frame_encryption || - rtp_video_stream_receiver_.IsDecryptable())) { - RTC_LOG(LS_WARNING) << "No decodable frame in " << GetWaitMs() - << " ms, requesting keyframe."; - RequestKeyFrame(); - } + return true; } void VideoReceiveStream::UpdatePlayoutDelays() const { diff --git a/video/video_receive_stream.h b/video/video_receive_stream.h index bc2469cf95..162ef8c8ee 100644 --- a/video/video_receive_stream.h +++ b/video/video_receive_stream.h @@ -23,7 +23,6 @@ #include "modules/video_coding/frame_buffer2.h" #include "modules/video_coding/video_coding_impl.h" #include "rtc_base/sequenced_task_checker.h" -#include "rtc_base/task_queue.h" #include "system_wrappers/include/clock.h" #include "video/receive_statistics_proxy.h" #include "video/rtp_streams_synchronizer.h" @@ -130,13 +129,8 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, std::vector GetSources() const override; private: - int64_t GetWaitMs() const; - void StartNextDecode() RTC_RUN_ON(decode_queue_); static void DecodeThreadFunction(void* ptr); bool Decode(); - void HandleEncodedFrame(std::unique_ptr frame); - void HandleFrameBufferTimeout(); - void UpdatePlayoutDelays() const RTC_EXCLUSIVE_LOCKS_REQUIRED(playout_delay_lock_); @@ -152,15 +146,10 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, ProcessThread* const process_thread_; Clock* const clock_; - const bool use_task_queue_; - rtc::PlatformThread decode_thread_; CallStats* const call_stats_; - bool decoder_running_ RTC_GUARDED_BY(worker_sequence_checker_) = false; - bool decoder_stopped_ RTC_GUARDED_BY(decode_queue_) = true; - ReceiveStatisticsProxy stats_proxy_; // Shared by media and rtx stream receivers, since the latter has no RtpRtcp // module of its own. @@ -176,10 +165,10 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, // TODO(nisse, philipel): Creation and ownership of video encoders should be // moved to the new VideoStreamDecoder. std::vector> video_decoders_; - std::unique_ptr frame_buffer_; // Members for the new jitter buffer experiment. std::unique_ptr jitter_estimator_; + std::unique_ptr frame_buffer_; std::unique_ptr media_receiver_; std::unique_ptr rtx_receive_stream_; @@ -215,9 +204,6 @@ class VideoReceiveStream : public webrtc::VideoReceiveStream, // Maximum delay as decided by the RTP playout delay extension. int frame_maximum_playout_delay_ms_ RTC_GUARDED_BY(playout_delay_lock_) = -1; - - // Defined last so they are destroyed before all other members. - rtc::TaskQueue decode_queue_; }; } // namespace internal } // namespace webrtc