From fd9500e3b5a2ac4d4835ed507a6f0b4a41e4172b Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Fri, 15 Jan 2021 17:29:53 +0100 Subject: [PATCH] In criket::BaseChannel replace AsyncInvoker with task queue functions all invokes, as well as BaseChannel constructor and destructor should run on the same task queue which allow to use simpler cancellation of pending task on BaseChannel destruction Bug: webrtc:12339 Change-Id: I311b6de940cc24cf6bb5b49e1bbd132fea2439e4 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202032 Reviewed-by: Tommi Commit-Queue: Tommi Cr-Commit-Position: refs/heads/master@{#33009} --- pc/BUILD.gn | 2 ++ pc/channel.cc | 43 +++++++++++++++++++++++-------------------- pc/channel.h | 4 ++-- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 9e296b5c3a..0155bc7c2e 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -121,6 +121,8 @@ rtc_library("rtc_pc_base") { "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:file_wrapper", "../rtc_base/system:rtc_export", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/base64", "../rtc_base/third_party/sigslot", "../system_wrappers:field_trial", diff --git a/pc/channel.cc b/pc/channel.cc index 69b2ca1676..98a8f9dd92 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -32,15 +32,19 @@ #include "rtc_base/strings/string_builder.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" namespace cricket { -using rtc::Bind; -using rtc::UniqueRandomIdGenerator; -using webrtc::SdpType; - namespace { +using ::rtc::Bind; +using ::rtc::UniqueRandomIdGenerator; +using ::webrtc::PendingTaskSafetyFlag; +using ::webrtc::SdpType; +using ::webrtc::ToQueuedTask; + struct SendPacketMessageData : public rtc::MessageData { rtc::CopyOnWriteBuffer packet; rtc::PacketOptions options; @@ -135,6 +139,7 @@ BaseChannel::BaseChannel(rtc::Thread* worker_thread, : worker_thread_(worker_thread), network_thread_(network_thread), signaling_thread_(signaling_thread), + alive_(PendingTaskSafetyFlag::Create()), content_name_(content_name), srtp_required_(srtp_required), crypto_options_(crypto_options), @@ -151,7 +156,7 @@ BaseChannel::~BaseChannel() { RTC_DCHECK_RUN_ON(worker_thread_); // Eats any outstanding messages or packets. - worker_thread_->Clear(&invoker_); + alive_->SetNotAlive(); worker_thread_->Clear(this); // The media channel is destroyed at the end of the destructor, since it // is a std::unique_ptr. The transport channel (rtp_transport) must outlive @@ -223,7 +228,6 @@ void BaseChannel::Deinit() { DisconnectFromRtpTransport(); } // Clear pending read packets/messages. - network_thread_->Clear(&invoker_); network_thread_->Clear(this); }); } @@ -401,10 +405,10 @@ void BaseChannel::OnNetworkRouteChanged( // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnNetworkRouteChanged(transport_name_, new_route); - }); + })); } sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { @@ -420,10 +424,10 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnReadyToSend(ready); - }); + })); } bool BaseChannel::SendPacket(bool rtcp, @@ -527,11 +531,11 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { auto packet_buffer = parsed_packet.Buffer(); - invoker_.AsyncInvoke( - RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] { + worker_thread_->PostTask( + ToQueuedTask(alive_, [this, packet_buffer, packet_time_us] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnPacketReceived(packet_buffer, packet_time_us); - }); + })); } void BaseChannel::EnableMedia_w() { @@ -574,11 +578,11 @@ void BaseChannel::ChannelWritable_n() { // We only have to do this AsyncInvoke once, when first transitioning to // writable. if (!was_ever_writable_n_) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [this] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this] { RTC_DCHECK_RUN_ON(worker_thread()); was_ever_writable_ = true; UpdateMediaSendRecvState_w(); - }); + })); } was_ever_writable_n_ = true; } @@ -830,11 +834,10 @@ void BaseChannel::FlushRtcpMessages_n() { } void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, - [this, sent_packet] { - RTC_DCHECK_RUN_ON(worker_thread()); - SignalSentPacket()(sent_packet); - }); + worker_thread_->PostTask(ToQueuedTask(alive_, [this, sent_packet] { + RTC_DCHECK_RUN_ON(worker_thread()); + SignalSentPacket()(sent_packet); + })); } void BaseChannel::SetNegotiatedHeaderExtensions_w( diff --git a/pc/channel.h b/pc/channel.h index bbb95d7ea1..8240582595 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -36,10 +36,10 @@ #include "pc/rtp_transport.h" #include "pc/srtp_filter.h" #include "pc/srtp_transport.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" #include "rtc_base/network.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/unique_id_generator.h" @@ -325,7 +325,7 @@ class BaseChannel : public ChannelInterface, rtc::Thread* const worker_thread_; rtc::Thread* const network_thread_; rtc::Thread* const signaling_thread_; - rtc::AsyncInvoker invoker_; + rtc::scoped_refptr alive_; sigslot::signal1 SignalFirstPacketReceived_ RTC_GUARDED_BY(signaling_thread_); sigslot::signal1 SignalSentPacket_