diff --git a/pc/BUILD.gn b/pc/BUILD.gn index 9e296b5c3a..0155bc7c2e 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn @@ -121,6 +121,8 @@ rtc_library("rtc_pc_base") { "../rtc_base/synchronization:sequence_checker", "../rtc_base/system:file_wrapper", "../rtc_base/system:rtc_export", + "../rtc_base/task_utils:pending_task_safety_flag", + "../rtc_base/task_utils:to_queued_task", "../rtc_base/third_party/base64", "../rtc_base/third_party/sigslot", "../system_wrappers:field_trial", diff --git a/pc/channel.cc b/pc/channel.cc index 69b2ca1676..98a8f9dd92 100644 --- a/pc/channel.cc +++ b/pc/channel.cc @@ -32,15 +32,19 @@ #include "rtc_base/strings/string_builder.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" +#include "rtc_base/task_utils/to_queued_task.h" #include "rtc_base/trace_event.h" namespace cricket { -using rtc::Bind; -using rtc::UniqueRandomIdGenerator; -using webrtc::SdpType; - namespace { +using ::rtc::Bind; +using ::rtc::UniqueRandomIdGenerator; +using ::webrtc::PendingTaskSafetyFlag; +using ::webrtc::SdpType; +using ::webrtc::ToQueuedTask; + struct SendPacketMessageData : public rtc::MessageData { rtc::CopyOnWriteBuffer packet; rtc::PacketOptions options; @@ -135,6 +139,7 @@ BaseChannel::BaseChannel(rtc::Thread* worker_thread, : worker_thread_(worker_thread), network_thread_(network_thread), signaling_thread_(signaling_thread), + alive_(PendingTaskSafetyFlag::Create()), content_name_(content_name), srtp_required_(srtp_required), crypto_options_(crypto_options), @@ -151,7 +156,7 @@ BaseChannel::~BaseChannel() { RTC_DCHECK_RUN_ON(worker_thread_); // Eats any outstanding messages or packets. - worker_thread_->Clear(&invoker_); + alive_->SetNotAlive(); worker_thread_->Clear(this); // The media channel is destroyed at the end of the destructor, since it // is a std::unique_ptr. The transport channel (rtp_transport) must outlive @@ -223,7 +228,6 @@ void BaseChannel::Deinit() { DisconnectFromRtpTransport(); } // Clear pending read packets/messages. - network_thread_->Clear(&invoker_); network_thread_->Clear(this); }); } @@ -401,10 +405,10 @@ void BaseChannel::OnNetworkRouteChanged( // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // work correctly. Intentionally leave it broken to simplify the code and // encourage the users to stop using non-muxing RTCP. - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnNetworkRouteChanged(transport_name_, new_route); - }); + })); } sigslot::signal1& BaseChannel::SignalFirstPacketReceived() { @@ -420,10 +424,10 @@ sigslot::signal1& BaseChannel::SignalSentPacket() { } void BaseChannel::OnTransportReadyToSend(bool ready) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [=] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnReadyToSend(ready); - }); + })); } bool BaseChannel::SendPacket(bool rtcp, @@ -527,11 +531,11 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) { auto packet_buffer = parsed_packet.Buffer(); - invoker_.AsyncInvoke( - RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] { + worker_thread_->PostTask( + ToQueuedTask(alive_, [this, packet_buffer, packet_time_us] { RTC_DCHECK_RUN_ON(worker_thread()); media_channel_->OnPacketReceived(packet_buffer, packet_time_us); - }); + })); } void BaseChannel::EnableMedia_w() { @@ -574,11 +578,11 @@ void BaseChannel::ChannelWritable_n() { // We only have to do this AsyncInvoke once, when first transitioning to // writable. if (!was_ever_writable_n_) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, [this] { + worker_thread_->PostTask(ToQueuedTask(alive_, [this] { RTC_DCHECK_RUN_ON(worker_thread()); was_ever_writable_ = true; UpdateMediaSendRecvState_w(); - }); + })); } was_ever_writable_n_ = true; } @@ -830,11 +834,10 @@ void BaseChannel::FlushRtcpMessages_n() { } void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) { - invoker_.AsyncInvoke(RTC_FROM_HERE, worker_thread_, - [this, sent_packet] { - RTC_DCHECK_RUN_ON(worker_thread()); - SignalSentPacket()(sent_packet); - }); + worker_thread_->PostTask(ToQueuedTask(alive_, [this, sent_packet] { + RTC_DCHECK_RUN_ON(worker_thread()); + SignalSentPacket()(sent_packet); + })); } void BaseChannel::SetNegotiatedHeaderExtensions_w( diff --git a/pc/channel.h b/pc/channel.h index bbb95d7ea1..8240582595 100644 --- a/pc/channel.h +++ b/pc/channel.h @@ -36,10 +36,10 @@ #include "pc/rtp_transport.h" #include "pc/srtp_filter.h" #include "pc/srtp_transport.h" -#include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" #include "rtc_base/network.h" #include "rtc_base/synchronization/sequence_checker.h" +#include "rtc_base/task_utils/pending_task_safety_flag.h" #include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/thread_annotations.h" #include "rtc_base/unique_id_generator.h" @@ -325,7 +325,7 @@ class BaseChannel : public ChannelInterface, rtc::Thread* const worker_thread_; rtc::Thread* const network_thread_; rtc::Thread* const signaling_thread_; - rtc::AsyncInvoker invoker_; + rtc::scoped_refptr alive_; sigslot::signal1 SignalFirstPacketReceived_ RTC_GUARDED_BY(signaling_thread_); sigslot::signal1 SignalSentPacket_