In criket::BaseChannel replace AsyncInvoker with task queue functions

all invokes, as well as BaseChannel constructor and destructor
should run on the same task queue which allow to use
simpler cancellation of pending task on BaseChannel destruction

Bug: webrtc:12339
Change-Id: I311b6de940cc24cf6bb5b49e1bbd132fea2439e4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/202032
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33009}
This commit is contained in:
Danil Chapovalov
2021-01-15 17:29:53 +01:00
committed by Commit Bot
parent 8ed61858c3
commit fd9500e3b5
3 changed files with 27 additions and 22 deletions

View File

@ -121,6 +121,8 @@ rtc_library("rtc_pc_base") {
"../rtc_base/synchronization:sequence_checker", "../rtc_base/synchronization:sequence_checker",
"../rtc_base/system:file_wrapper", "../rtc_base/system:file_wrapper",
"../rtc_base/system:rtc_export", "../rtc_base/system:rtc_export",
"../rtc_base/task_utils:pending_task_safety_flag",
"../rtc_base/task_utils:to_queued_task",
"../rtc_base/third_party/base64", "../rtc_base/third_party/base64",
"../rtc_base/third_party/sigslot", "../rtc_base/third_party/sigslot",
"../system_wrappers:field_trial", "../system_wrappers:field_trial",

View File

@ -32,15 +32,19 @@
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
#include "rtc_base/synchronization/mutex.h" #include "rtc_base/synchronization/mutex.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
namespace cricket { namespace cricket {
using rtc::Bind;
using rtc::UniqueRandomIdGenerator;
using webrtc::SdpType;
namespace { namespace {
using ::rtc::Bind;
using ::rtc::UniqueRandomIdGenerator;
using ::webrtc::PendingTaskSafetyFlag;
using ::webrtc::SdpType;
using ::webrtc::ToQueuedTask;
struct SendPacketMessageData : public rtc::MessageData { struct SendPacketMessageData : public rtc::MessageData {
rtc::CopyOnWriteBuffer packet; rtc::CopyOnWriteBuffer packet;
rtc::PacketOptions options; rtc::PacketOptions options;
@ -135,6 +139,7 @@ BaseChannel::BaseChannel(rtc::Thread* worker_thread,
: worker_thread_(worker_thread), : worker_thread_(worker_thread),
network_thread_(network_thread), network_thread_(network_thread),
signaling_thread_(signaling_thread), signaling_thread_(signaling_thread),
alive_(PendingTaskSafetyFlag::Create()),
content_name_(content_name), content_name_(content_name),
srtp_required_(srtp_required), srtp_required_(srtp_required),
crypto_options_(crypto_options), crypto_options_(crypto_options),
@ -151,7 +156,7 @@ BaseChannel::~BaseChannel() {
RTC_DCHECK_RUN_ON(worker_thread_); RTC_DCHECK_RUN_ON(worker_thread_);
// Eats any outstanding messages or packets. // Eats any outstanding messages or packets.
worker_thread_->Clear(&invoker_); alive_->SetNotAlive();
worker_thread_->Clear(this); worker_thread_->Clear(this);
// The media channel is destroyed at the end of the destructor, since it // The media channel is destroyed at the end of the destructor, since it
// is a std::unique_ptr. The transport channel (rtp_transport) must outlive // is a std::unique_ptr. The transport channel (rtp_transport) must outlive
@ -223,7 +228,6 @@ void BaseChannel::Deinit() {
DisconnectFromRtpTransport(); DisconnectFromRtpTransport();
} }
// Clear pending read packets/messages. // Clear pending read packets/messages.
network_thread_->Clear(&invoker_);
network_thread_->Clear(this); network_thread_->Clear(this);
}); });
} }
@ -401,10 +405,10 @@ void BaseChannel::OnNetworkRouteChanged(
// use the same transport name and MediaChannel::OnNetworkRouteChanged cannot // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot
// work correctly. Intentionally leave it broken to simplify the code and // work correctly. Intentionally leave it broken to simplify the code and
// encourage the users to stop using non-muxing RTCP. // encourage the users to stop using non-muxing RTCP.
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] { worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] {
RTC_DCHECK_RUN_ON(worker_thread()); RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnNetworkRouteChanged(transport_name_, new_route); media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
}); }));
} }
sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() { sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
@ -420,10 +424,10 @@ sigslot::signal1<const rtc::SentPacket&>& BaseChannel::SignalSentPacket() {
} }
void BaseChannel::OnTransportReadyToSend(bool ready) { void BaseChannel::OnTransportReadyToSend(bool ready) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] { worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] {
RTC_DCHECK_RUN_ON(worker_thread()); RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnReadyToSend(ready); media_channel_->OnReadyToSend(ready);
}); }));
} }
bool BaseChannel::SendPacket(bool rtcp, bool BaseChannel::SendPacket(bool rtcp,
@ -527,11 +531,11 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
auto packet_buffer = parsed_packet.Buffer(); auto packet_buffer = parsed_packet.Buffer();
invoker_.AsyncInvoke<void>( worker_thread_->PostTask(
RTC_FROM_HERE, worker_thread_, [this, packet_buffer, packet_time_us] { ToQueuedTask(alive_, [this, packet_buffer, packet_time_us] {
RTC_DCHECK_RUN_ON(worker_thread()); RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnPacketReceived(packet_buffer, packet_time_us); media_channel_->OnPacketReceived(packet_buffer, packet_time_us);
}); }));
} }
void BaseChannel::EnableMedia_w() { void BaseChannel::EnableMedia_w() {
@ -574,11 +578,11 @@ void BaseChannel::ChannelWritable_n() {
// We only have to do this AsyncInvoke once, when first transitioning to // We only have to do this AsyncInvoke once, when first transitioning to
// writable. // writable.
if (!was_ever_writable_n_) { if (!was_ever_writable_n_) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [this] { worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread()); RTC_DCHECK_RUN_ON(worker_thread());
was_ever_writable_ = true; was_ever_writable_ = true;
UpdateMediaSendRecvState_w(); UpdateMediaSendRecvState_w();
}); }));
} }
was_ever_writable_n_ = true; was_ever_writable_n_ = true;
} }
@ -830,11 +834,10 @@ void BaseChannel::FlushRtcpMessages_n() {
} }
void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) { void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, worker_thread_->PostTask(ToQueuedTask(alive_, [this, sent_packet] {
[this, sent_packet] { RTC_DCHECK_RUN_ON(worker_thread());
RTC_DCHECK_RUN_ON(worker_thread()); SignalSentPacket()(sent_packet);
SignalSentPacket()(sent_packet); }));
});
} }
void BaseChannel::SetNegotiatedHeaderExtensions_w( void BaseChannel::SetNegotiatedHeaderExtensions_w(

View File

@ -36,10 +36,10 @@
#include "pc/rtp_transport.h" #include "pc/rtp_transport.h"
#include "pc/srtp_filter.h" #include "pc/srtp_filter.h"
#include "pc/srtp_transport.h" #include "pc/srtp_transport.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/async_udp_socket.h" #include "rtc_base/async_udp_socket.h"
#include "rtc_base/network.h" #include "rtc_base/network.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/third_party/sigslot/sigslot.h" #include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
#include "rtc_base/unique_id_generator.h" #include "rtc_base/unique_id_generator.h"
@ -325,7 +325,7 @@ class BaseChannel : public ChannelInterface,
rtc::Thread* const worker_thread_; rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_; rtc::Thread* const network_thread_;
rtc::Thread* const signaling_thread_; rtc::Thread* const signaling_thread_;
rtc::AsyncInvoker invoker_; rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> alive_;
sigslot::signal1<ChannelInterface*> SignalFirstPacketReceived_ sigslot::signal1<ChannelInterface*> SignalFirstPacketReceived_
RTC_GUARDED_BY(signaling_thread_); RTC_GUARDED_BY(signaling_thread_);
sigslot::signal1<const rtc::SentPacket&> SignalSentPacket_ sigslot::signal1<const rtc::SentPacket&> SignalSentPacket_