Migrate pc/ to absl::AnyInvocable based TaskQueueBase interface

Bug: webrtc:14245
Change-Id: I9043aa507421a93f0d7ba7406e237f727999b696
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268121
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37478}
This commit is contained in:
Danil Chapovalov
2022-07-07 10:08:49 +02:00
committed by WebRTC LUCI CQ
parent b7128ed172
commit a30439bbe6
15 changed files with 140 additions and 188 deletions

View File

@ -78,7 +78,6 @@ rtc_source_set("channel") {
"../api:sequence_checker",
"../api/crypto:options",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:timestamp",
"../call:rtp_interfaces",
"../call:rtp_receiver",
@ -798,7 +797,6 @@ rtc_source_set("peerconnection") {
"../api/rtc_event_log",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport:bitrate_settings",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
@ -868,7 +866,6 @@ rtc_library("sctp_data_channel") {
"../api:priority",
"../api:rtc_error",
"../api:scoped_refptr",
"../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../media:rtc_data_sctp_transport_internal",
"../media:rtc_media_base",
@ -883,7 +880,10 @@ rtc_library("sctp_data_channel") {
"../rtc_base/system:unused",
"../rtc_base/third_party/sigslot:sigslot",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
absl_deps = [
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/types:optional",
]
}
rtc_library("data_channel_utils") {
@ -916,7 +916,6 @@ rtc_library("connection_context") {
"../api:scoped_refptr",
"../api:sequence_checker",
"../api/neteq:neteq_api",
"../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
"../api/transport:sctp_transport_factory_interface",
"../media:rtc_data_sctp_transport_factory",
@ -948,7 +947,6 @@ rtc_source_set("data_channel_controller") {
"../api:rtc_error",
"../api:scoped_refptr",
"../api:sequence_checker",
"../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../media:rtc_media_base",
"../rtc_base",
@ -1053,6 +1051,7 @@ rtc_source_set("rtc_stats_collector") {
"../rtc_base/third_party/sigslot:sigslot",
]
absl_deps = [
"//third_party/abseil-cpp/absl/functional:bind_front",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
@ -1213,7 +1212,6 @@ rtc_source_set("peer_connection") {
"../api/crypto:options",
"../api/rtc_event_log",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport:bitrate_settings",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
@ -1577,7 +1575,6 @@ rtc_library("rtp_transceiver") {
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/video:video_bitrate_allocator_factory",
"../media:rtc_media_base",
"../rtc_base:checks",
@ -1709,7 +1706,6 @@ rtc_library("audio_rtp_receiver") {
"../api:sequence_checker",
"../api/crypto:frame_decryptor_interface",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport/rtp:rtp_source",
"../media:rtc_media_base",
"../rtc_base",
@ -1950,15 +1946,14 @@ rtc_library("dtmf_sender") {
"../api:libjingle_peerconnection_api",
"../api:scoped_refptr",
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../rtc_base:checks",
"../rtc_base:location",
"../rtc_base:logging",
"../rtc_base:macromagic",
"../rtc_base:refcount",
"../rtc_base:rtc_base",
"../rtc_base:threading",
"../rtc_base/third_party/sigslot",
]
absl_deps = [
@ -2098,7 +2093,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../api:sequence_checker",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue",
"../api/task_queue:to_queued_task",
"../api/transport:datagram_transport_interface",
"../api/transport:enums",
"../api/video:builtin_video_bitrate_allocator_factory",
@ -2135,7 +2129,10 @@ if (rtc_include_tests && !build_with_chromium) {
"../test:scoped_key_value_config",
"../test:test_main",
"../test:test_support",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
@ -2566,7 +2563,6 @@ if (rtc_include_tests && !build_with_chromium) {
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
"../api/transport/rtp:rtp_source",
"../api/units:time_delta",

View File

@ -17,7 +17,6 @@
#include <vector>
#include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
#include "pc/audio_track.h"
#include "pc/media_stream_track_proxy.h"
#include "rtc_base/checks.h"
@ -78,8 +77,7 @@ void AudioRtpReceiver::OnChanged() {
if (cached_track_enabled_ == enabled)
return;
cached_track_enabled_ = enabled;
worker_thread_->PostTask(
ToQueuedTask(worker_thread_safety_, [this, enabled]() {
worker_thread_->PostTask(SafeTask(worker_thread_safety_, [this, enabled]() {
RTC_DCHECK_RUN_ON(worker_thread_);
Reconfigure(enabled);
}));

View File

@ -20,7 +20,6 @@
#include "api/rtp_parameters.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "api/units/timestamp.h"
#include "media/base/codec.h"
#include "media/base/rid_description.h"
@ -43,7 +42,6 @@ using ::rtc::StringFormat;
using ::rtc::UniqueRandomIdGenerator;
using ::webrtc::PendingTaskSafetyFlag;
using ::webrtc::SdpType;
using ::webrtc::ToQueuedTask;
// Finds a stream based on target's Primary SSRC or RIDs.
// This struct is used in BaseChannel::UpdateLocalStreams_w.
@ -197,7 +195,7 @@ bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
if (rtp_transport_) {
DisconnectFromRtpTransport_n();
// Clear the cached header extensions on the worker.
worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
worker_thread_->PostTask(SafeTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
rtp_header_extensions_.clear();
}));
@ -237,7 +235,7 @@ void BaseChannel::Enable(bool enable) {
enabled_s_ = enable;
worker_thread_->PostTask(ToQueuedTask(alive_, [this, enable] {
worker_thread_->PostTask(SafeTask(alive_, [this, enable] {
RTC_DCHECK_RUN_ON(worker_thread());
// Sanity check to make sure that enabled_ and enabled_s_
// stay in sync.
@ -552,7 +550,7 @@ void BaseChannel::ChannelWritable_n() {
// We only have to do this PostTask once, when first transitioning to
// writable.
if (!was_ever_writable_n_) {
worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
worker_thread_->PostTask(SafeTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
was_ever_writable_ = true;
UpdateMediaSendRecvState_w();

View File

@ -16,11 +16,11 @@
#include <string>
#include <type_traits>
#include "absl/functional/any_invocable.h"
#include "api/array_view.h"
#include "api/audio_options.h"
#include "api/rtp_parameters.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/to_queued_task.h"
#include "media/base/codec.h"
#include "media/base/fake_media_engine.h"
#include "media/base/fake_rtp.h"
@ -419,7 +419,7 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
}
void SendRtp(typename T::MediaChannel* media_channel, rtc::Buffer data) {
network_thread_->PostTask(webrtc::ToQueuedTask(
network_thread_->PostTask(webrtc::SafeTask(
network_thread_safety_, [media_channel, data = std::move(data)]() {
media_channel->SendRtp(data.data(), data.size(),
rtc::PacketOptions());
@ -503,11 +503,10 @@ class ChannelTest : public ::testing::Test, public sigslot::has_slots<> {
// destroyed before this object goes out of scope.
class ScopedCallThread {
public:
template <class FunctorT>
explicit ScopedCallThread(FunctorT&& functor)
explicit ScopedCallThread(absl::AnyInvocable<void() &&> functor)
: thread_(rtc::Thread::Create()) {
thread_->Start();
thread_->PostTask(std::forward<FunctorT>(functor));
thread_->PostTask(std::move(functor));
}
~ScopedCallThread() { thread_->Stop(); }

View File

@ -14,7 +14,6 @@
#include <utility>
#include <vector>
#include "api/task_queue/to_queued_task.h"
#include "api/transport/field_trial_based_config.h"
#include "media/base/media_engine.h"
#include "media/sctp/sctp_transport_factory.h"
@ -120,7 +119,7 @@ ConnectionContext::ConnectionContext(
// network_thread_->IsCurrent() == true means signaling_thread_ is
// network_thread_. In this case, no further action is required as
// signaling_thread_ can already invoke network_thread_.
network_thread_->PostTask(ToQueuedTask(
network_thread_->PostTask(
[thread = network_thread_, worker_thread = worker_thread_.get()] {
thread->DisallowBlockingCalls();
thread->DisallowAllInvokes();
@ -128,7 +127,7 @@ ConnectionContext::ConnectionContext(
// In this case, worker_thread_ == network_thread_
thread->AllowInvokesToThread(thread);
}
}));
});
}
rtc::InitRandom(rtc::Time32());

View File

@ -14,7 +14,6 @@
#include "api/peer_connection_interface.h"
#include "api/rtc_error.h"
#include "api/task_queue/to_queued_task.h"
#include "pc/peer_connection_internal.h"
#include "pc/sctp_utils.h"
#include "rtc_base/location.h"
@ -114,7 +113,7 @@ void DataChannelController::OnDataReceived(
params.sid = channel_id;
params.type = type;
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr(), params, buffer] {
[self = weak_factory_.GetWeakPtr(), params, buffer] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
// TODO(bugs.webrtc.org/11547): The data being received should be
@ -129,53 +128,49 @@ void DataChannelController::OnDataReceived(
self->SignalDataChannelTransportReceivedData_s(params, buffer);
}
}
}));
});
}
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
self->SignalDataChannelTransportChannelClosing_s(channel_id);
}
}));
});
}
void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr(), channel_id] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), channel_id] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
self->SignalDataChannelTransportChannelClosed_s(channel_id);
}
}));
});
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
self->data_channel_transport_ready_to_send_ = true;
self->SignalDataChannelTransportWritable_s(
self->data_channel_transport_ready_to_send_);
}
}));
});
}
void DataChannelController::OnTransportClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr(), error] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr(), error] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
self->OnTransportChannelClosed(error);
}
}));
});
}
void DataChannelController::SetupDataChannelTransport_n() {
@ -345,13 +340,12 @@ void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
// we can't free it directly here; we need to free it asynchronously.
sctp_data_channels_to_free_.push_back(*it);
sctp_data_channels_.erase(it);
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
self->sctp_data_channels_to_free_.clear();
}
}));
});
return;
}
}
@ -413,15 +407,14 @@ bool DataChannelController::DataChannelSendData(
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask([self = weak_factory_.GetWeakPtr()] {
signaling_thread()->PostTask([self = weak_factory_.GetWeakPtr()] {
if (self) {
RTC_DCHECK_RUN_ON(self->signaling_thread());
for (const auto& channel : self->sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
}
}));
});
}
rtc::Thread* DataChannelController::network_thread() const {

View File

@ -13,10 +13,11 @@
#include <ctype.h>
#include <string.h>
#include "api/task_queue/to_queued_task.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/thread.h"
namespace webrtc {
@ -57,7 +58,7 @@ bool GetDtmfCode(char tone, int* code) {
}
rtc::scoped_refptr<DtmfSender> DtmfSender::Create(
rtc::Thread* signaling_thread,
TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider) {
if (!signaling_thread) {
return nullptr;
@ -65,7 +66,7 @@ rtc::scoped_refptr<DtmfSender> DtmfSender::Create(
return rtc::make_ref_counted<DtmfSender>(signaling_thread, provider);
}
DtmfSender::DtmfSender(rtc::Thread* signaling_thread,
DtmfSender::DtmfSender(TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider)
: observer_(nullptr),
signaling_thread_(signaling_thread),
@ -165,12 +166,12 @@ int DtmfSender::comma_delay() const {
void DtmfSender::QueueInsertDtmf(const rtc::Location& posted_from,
uint32_t delay_ms) {
signaling_thread_->PostDelayedHighPrecisionTask(
ToQueuedTask(safety_flag_,
SafeTask(safety_flag_,
[this] {
RTC_DCHECK_RUN_ON(signaling_thread_);
DoInsertDtmf();
}),
delay_ms);
TimeDelta::Millis(delay_ms));
}
void DtmfSender::DoInsertDtmf() {

View File

@ -19,11 +19,11 @@
#include "api/scoped_refptr.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "pc/proxy.h"
#include "rtc_base/location.h"
#include "rtc_base/ref_count.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
// DtmfSender is the native implementation of the RTCDTMFSender defined by
@ -53,7 +53,7 @@ class DtmfProviderInterface {
class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> {
public:
static rtc::scoped_refptr<DtmfSender> Create(rtc::Thread* signaling_thread,
static rtc::scoped_refptr<DtmfSender> Create(TaskQueueBase* signaling_thread,
DtmfProviderInterface* provider);
// Implements DtmfSenderInterface.
@ -70,7 +70,7 @@ class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> {
int comma_delay() const override;
protected:
DtmfSender(rtc::Thread* signaling_thread, DtmfProviderInterface* provider);
DtmfSender(TaskQueueBase* signaling_thread, DtmfProviderInterface* provider);
virtual ~DtmfSender();
DtmfSender(const DtmfSender&) = delete;
@ -90,7 +90,7 @@ class DtmfSender : public DtmfSenderInterface, public sigslot::has_slots<> {
void StopSending() RTC_RUN_ON(signaling_thread_);
DtmfSenderObserverInterface* observer_ RTC_GUARDED_BY(signaling_thread_);
rtc::Thread* signaling_thread_;
TaskQueueBase* const signaling_thread_;
DtmfProviderInterface* provider_ RTC_GUARDED_BY(signaling_thread_);
std::string tones_ RTC_GUARDED_BY(signaling_thread_);
int duration_ RTC_GUARDED_BY(signaling_thread_);

View File

@ -25,7 +25,6 @@
#include "api/jsep_ice_candidate.h"
#include "api/rtp_parameters.h"
#include "api/rtp_transceiver_direction.h"
#include "api/task_queue/to_queued_task.h"
#include "api/uma_metrics.h"
#include "api/video/video_codec_constants.h"
#include "call/audio_state.h"
@ -726,7 +725,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
ReportTransportStats();
}
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
}));
@ -735,7 +734,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
}));
@ -744,7 +743,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
}));
@ -753,7 +752,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
SafeTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
}));
@ -763,7 +762,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(),
SafeTask(signaling_thread_safety_.flag(),
[this, t = transport, c = candidates]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(t, c);
@ -772,8 +771,8 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
}));
@ -782,7 +781,7 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() {
SafeTask(signaling_thread_safety_.flag(), [this, c = c]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
}));
@ -790,8 +789,8 @@ JsepTransportController* PeerConnection::InitializeTransportController_n(
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
signaling_thread()->PostTask(
SafeTask(signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
}));
@ -2497,7 +2496,7 @@ bool PeerConnection::SetupDataChannelTransport_n(const std::string& mid) {
transport_controller_->GetDtlsTransport(mid);
if (dtls_transport) {
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(),
SafeTask(signaling_thread_safety_.flag(),
[this, name = dtls_transport->transport_name()] {
RTC_DCHECK_RUN_ON(signaling_thread());
sctp_transport_name_s_ = std::move(name);
@ -2662,14 +2661,14 @@ void PeerConnection::AddRemoteCandidate(const std::string& mid,
const cricket::Candidate& candidate) {
RTC_DCHECK_RUN_ON(signaling_thread());
network_thread()->PostTask(ToQueuedTask(
network_thread()->PostTask(SafeTask(
network_thread_safety_, [this, mid = mid, candidate = candidate] {
RTC_DCHECK_RUN_ON(network_thread());
std::vector<cricket::Candidate> candidates = {candidate};
RTCError error =
transport_controller_->AddRemoteCandidates(mid, candidates);
if (error.ok()) {
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread()->PostTask(SafeTask(
signaling_thread_safety_.flag(),
[this, candidate = std::move(candidate)] {
ReportRemoteIceCandidateAdded(candidate);
@ -2916,7 +2915,7 @@ bool PeerConnection::OnTransportChanged(
if (mid == sctp_mid_n_) {
data_channel_controller_.OnTransportChanged(data_channel_transport);
if (dtls_transport) {
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread()->PostTask(SafeTask(
signaling_thread_safety_.flag(),
[this,
name = std::string(dtls_transport->internal()->transport_name())] {
@ -2942,7 +2941,7 @@ void PeerConnection::StartSctpTransport(int local_port,
if (!sctp_mid_s_)
return;
network_thread()->PostTask(ToQueuedTask(
network_thread()->PostTask(SafeTask(
network_thread_safety_,
[this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] {
rtc::scoped_refptr<SctpTransport> sctp_transport =

View File

@ -123,7 +123,7 @@ class ReturnType<void> {
};
template <typename C, typename R, typename... Args>
class MethodCall : public QueuedTask {
class MethodCall {
public:
typedef R (C::*Method)(Args...);
MethodCall(C* c, Method m, Args&&... args)
@ -135,19 +135,16 @@ class MethodCall : public QueuedTask {
if (t->IsCurrent()) {
Invoke(std::index_sequence_for<Args...>());
} else {
t->PostTask(std::unique_ptr<QueuedTask>(this));
t->PostTask([this] {
Invoke(std::index_sequence_for<Args...>());
event_.Set();
});
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}
private:
bool Run() override {
Invoke(std::index_sequence_for<Args...>());
event_.Set();
return false;
}
template <size_t... Is>
void Invoke(std::index_sequence<Is...>) {
r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);
@ -161,7 +158,7 @@ class MethodCall : public QueuedTask {
};
template <typename C, typename R, typename... Args>
class ConstMethodCall : public QueuedTask {
class ConstMethodCall {
public:
typedef R (C::*Method)(Args...) const;
ConstMethodCall(const C* c, Method m, Args&&... args)
@ -173,19 +170,16 @@ class ConstMethodCall : public QueuedTask {
if (t->IsCurrent()) {
Invoke(std::index_sequence_for<Args...>());
} else {
t->PostTask(std::unique_ptr<QueuedTask>(this));
t->PostTask([this] {
Invoke(std::index_sequence_for<Args...>());
event_.Set();
});
event_.Wait(rtc::Event::kForever);
}
return r_.moved_result();
}
private:
bool Run() override {
Invoke(std::index_sequence_for<Args...>());
event_.Set();
return false;
}
template <size_t... Is>
void Invoke(std::index_sequence<Is...>) {
r_.Invoke(c_, m_, std::move(std::get<Is>(args_))...);

View File

@ -21,6 +21,7 @@
#include <utility>
#include <vector>
#include "absl/functional/bind_front.h"
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "api/candidate.h"
@ -1304,33 +1305,10 @@ void RTCStatsCollector::GetStatsReportInternal(
// We have a fresh cached report to deliver. Deliver asynchronously, since
// the caller may not be expecting a synchronous callback, and it avoids
// reentrancy problems.
std::vector<RequestInfo> requests;
requests.swap(requests_);
// Task subclass to take ownership of the requests.
// TODO(nisse): Delete when we can use C++14, and do lambda capture with
// std::move.
class DeliveryTask : public QueuedTask {
public:
DeliveryTask(rtc::scoped_refptr<RTCStatsCollector> collector,
rtc::scoped_refptr<const RTCStatsReport> cached_report,
std::vector<RequestInfo> requests)
: collector_(collector),
cached_report_(cached_report),
requests_(std::move(requests)) {}
bool Run() override {
collector_->DeliverCachedReport(cached_report_, std::move(requests_));
return true;
}
private:
rtc::scoped_refptr<RTCStatsCollector> collector_;
rtc::scoped_refptr<const RTCStatsReport> cached_report_;
std::vector<RequestInfo> requests_;
};
signaling_thread_->PostTask(std::make_unique<DeliveryTask>(
rtc::scoped_refptr<RTCStatsCollector>(this), cached_report_,
std::move(requests)));
signaling_thread_->PostTask(
absl::bind_front(&RTCStatsCollector::DeliverCachedReport,
rtc::scoped_refptr<RTCStatsCollector>(this),
cached_report_, std::move(requests_)));
} else if (!num_pending_partial_reports_) {
// Only start gathering stats if we're not already gathering stats. In the
// case of already gathering stats, `callback_` will be invoked when there

View File

@ -21,7 +21,6 @@
#include "api/peer_connection_interface.h"
#include "api/rtp_parameters.h"
#include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
#include "media/base/codec.h"
#include "media/base/media_constants.h"
#include "media/base/media_engine.h"
@ -287,8 +286,8 @@ void RtpTransceiver::SetChannel(
channel_->SetRtpTransport(transport_lookup(channel_->mid()));
channel_->SetFirstPacketReceivedCallback(
[thread = thread_, flag = signaling_thread_safety_, this]() mutable {
thread->PostTask(ToQueuedTask(std::move(flag),
[this]() { OnFirstPacketReceived(); }));
thread->PostTask(
SafeTask(std::move(flag), [this]() { OnFirstPacketReceived(); }));
});
});
PushNewMediaChannelAndDeleteChannel(nullptr);

View File

@ -15,7 +15,7 @@
#include <string>
#include <utility>
#include "api/task_queue/to_queued_task.h"
#include "absl/cleanup/cleanup.h"
#include "media/sctp/sctp_transport_internal.h"
#include "pc/proxy.h"
#include "pc/sctp_utils.h"
@ -221,13 +221,12 @@ bool SctpDataChannel::Init() {
RTC_DCHECK(!controller_detached_);
if (controller_->ReadyToSendData()) {
AddRef();
rtc::Thread::Current()->PostTask(ToQueuedTask(
[this] {
absl::Cleanup release = [this] { Release(); };
rtc::Thread::Current()->PostTask([this, release = std::move(release)] {
RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ != kClosed)
OnTransportReady(true);
},
[this] { Release(); }));
});
}
return true;

View File

@ -64,8 +64,7 @@ TaskQueueMetronome::TaskQueueMetronome(TaskQueueFactory* factory,
tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
MutexLock lock(&mutex_);
for (auto* listener : listeners_) {
listener->OnTickTaskQueue()->PostTask(
ToQueuedTask([listener] { listener->OnTick(); }));
listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); });
}
return tick_period_;
});

View File

@ -54,9 +54,9 @@
#include "api/task_queue/default_task_queue_factory.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/to_queued_task.h"
#include "api/transport/field_trial_based_config.h"
#include "api/uma_metrics.h"
#include "api/units/time_delta.h"
#include "api/video/video_rotation.h"
#include "api/video_codecs/sdp_video_format.h"
#include "api/video_codecs/video_decoder_factory.h"
@ -1007,11 +1007,11 @@ class PeerConnectionIntegrationWrapper : public webrtc::PeerConnectionObserver,
RelaySdpMessageIfReceiverExists(type, msg);
} else {
rtc::Thread::Current()->PostDelayedTask(
ToQueuedTask(task_safety_.flag(),
SafeTask(task_safety_.flag(),
[this, type, msg] {
RelaySdpMessageIfReceiverExists(type, msg);
}),
signaling_delay_ms_);
TimeDelta::Millis(signaling_delay_ms_));
}
}
@ -1030,12 +1030,12 @@ class PeerConnectionIntegrationWrapper : public webrtc::PeerConnectionObserver,
RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index, msg);
} else {
rtc::Thread::Current()->PostDelayedTask(
ToQueuedTask(task_safety_.flag(),
SafeTask(task_safety_.flag(),
[this, sdp_mid, sdp_mline_index, msg] {
RelayIceMessageIfReceiverExists(sdp_mid,
sdp_mline_index, msg);
RelayIceMessageIfReceiverExists(sdp_mid, sdp_mline_index,
msg);
}),
signaling_delay_ms_);
TimeDelta::Millis(signaling_delay_ms_));
}
}