Delete AsyncInvoker usage in DataChannelController

Tasks access this via WeakPtrFactory.

Bug: webrtc:12339
Change-Id: I0aaeffd4bed59a6abfadf995286644c24c1fd716
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/212721
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33560}
This commit is contained in:
Niels Möller
2021-03-23 09:23:10 +01:00
committed by Commit Bot
parent 4c555cca2d
commit 236e36c25f
2 changed files with 59 additions and 45 deletions

View File

@ -22,6 +22,7 @@
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/string_encode.h" #include "rtc_base/string_encode.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc { namespace webrtc {
@ -137,9 +138,10 @@ void DataChannelController::OnDataReceived(
cricket::ReceiveDataParams params; cricket::ReceiveDataParams params;
params.sid = channel_id; params.sid = channel_id;
params.type = ToCricketDataMessageType(type); params.type = ToCricketDataMessageType(type);
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be // TODO(bugs.webrtc.org/11547): The data being received should be
// delivered on the network thread. The way HandleOpenMessage_s works // delivered on the network thread. The way HandleOpenMessage_s works
// right now is that it's called for all types of buffers and operates // right now is that it's called for all types of buffers and operates
@ -148,48 +150,57 @@ void DataChannelController::OnDataReceived(
// deliver all other buffers on the network thread (change // deliver all other buffers on the network thread (change
// SignalDataChannelTransportReceivedData_s to // SignalDataChannelTransportReceivedData_s to
// SignalDataChannelTransportReceivedData_n). // SignalDataChannelTransportReceivedData_n).
if (!HandleOpenMessage_s(params, buffer)) { if (!self->HandleOpenMessage_s(params, buffer)) {
SignalDataChannelTransportReceivedData_s(params, buffer); self->SignalDataChannelTransportReceivedData_s(params, buffer);
} }
}); }
}));
} }
void DataChannelController::OnChannelClosing(int channel_id) { void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] { ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
SignalDataChannelTransportChannelClosing_s(channel_id); RTC_DCHECK_RUN_ON(self->signaling_thread());
}); self->SignalDataChannelTransportChannelClosing_s(channel_id);
}
}));
} }
void DataChannelController::OnChannelClosed(int channel_id) { void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] { ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
SignalDataChannelTransportChannelClosed_s(channel_id); RTC_DCHECK_RUN_ON(self->signaling_thread());
}); self->SignalDataChannelTransportChannelClosed_s(channel_id);
}
}));
} }
void DataChannelController::OnReadyToSend() { void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this] { ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
data_channel_transport_ready_to_send_ = true; RTC_DCHECK_RUN_ON(self->signaling_thread());
SignalDataChannelTransportWritable_s( self->data_channel_transport_ready_to_send_ = true;
data_channel_transport_ready_to_send_); self->SignalDataChannelTransportWritable_s(
}); self->data_channel_transport_ready_to_send_);
}
}));
} }
void DataChannelController::OnTransportClosed() { void DataChannelController::OnTransportClosed() {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this] { ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
OnTransportChannelClosed(); RTC_DCHECK_RUN_ON(self->signaling_thread());
}); self->OnTransportChannelClosed();
}
}));
} }
void DataChannelController::SetupDataChannelTransport_n() { void DataChannelController::SetupDataChannelTransport_n() {
@ -392,12 +403,12 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
sctp_data_channels_to_free_.push_back(*it); sctp_data_channels_to_free_.push_back(*it);
sctp_data_channels_.erase(it); sctp_data_channels_.erase(it);
signaling_thread()->PostTask( signaling_thread()->PostTask(
RTC_FROM_HERE, [self = weak_factory_.GetWeakPtr()] { ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
if (self) { if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread()); RTC_DCHECK_RUN_ON(self->signaling_thread());
self->sctp_data_channels_to_free_.clear(); self->sctp_data_channels_to_free_.clear();
} }
}); }));
return; return;
} }
} }
@ -598,13 +609,15 @@ bool DataChannelController::DataChannelSendData(
void DataChannelController::NotifyDataChannelsOfTransportCreated() { void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread()); RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_.AsyncInvoke<void>( signaling_thread()->PostTask(
RTC_FROM_HERE, signaling_thread(), [this] { ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
RTC_DCHECK_RUN_ON(signaling_thread()); if (self) {
for (const auto& channel : sctp_data_channels_) { RTC_DCHECK_RUN_ON(self->signaling_thread());
for (const auto& channel : self->sctp_data_channels_) {
channel->OnTransportChannelCreated(); channel->OnTransportChannelCreated();
} }
}); }
}));
} }
rtc::Thread* DataChannelController::network_thread() const { rtc::Thread* DataChannelController::network_thread() const {

View File

@ -29,7 +29,6 @@
#include "pc/data_channel_utils.h" #include "pc/data_channel_utils.h"
#include "pc/rtp_data_channel.h" #include "pc/rtp_data_channel.h"
#include "pc/sctp_data_channel.h" #include "pc/sctp_data_channel.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/copy_on_write_buffer.h" #include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/ssl_stream_adapter.h" #include "rtc_base/ssl_stream_adapter.h"
@ -245,6 +244,8 @@ class DataChannelController : public RtpDataChannelProviderInterface,
// Owning PeerConnection. // Owning PeerConnection.
PeerConnection* const pc_; PeerConnection* const pc_;
// The weak pointers must be dereferenced and invalidated on the signalling
// thread only.
rtc::WeakPtrFactory<DataChannelController> weak_factory_{this}; rtc::WeakPtrFactory<DataChannelController> weak_factory_{this};
}; };