Remove asyncinvoker from PeerConnection.
The callback that the asyncinvoker was being used for, will now use a safety flag to check if call_ is valid before issuing calls. Using the flag is a step towards removing the call_ptr_ variable but in this CL we're just looking at replacing use of the async invoker. The safety flag is cleared at the same time as call_ is, which prevents pending callbacks for that call instance from running. Also adding TODOs related to this change that will be followed upon in other CLs. Bug: webrtc:11988, webrtc:11992, webrtc:11993 Change-Id: If3986758af6d01d39b2db0cce82e57fc48be9d7f Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185508 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Cr-Commit-Position: refs/heads/master@{#32208}
This commit is contained in:

committed by
Commit Bot

parent
b6103ff5f9
commit
1e40a0cabd
@ -274,6 +274,7 @@ rtc_library("peerconnection") {
|
|||||||
"../rtc_base/synchronization:sequence_checker",
|
"../rtc_base/synchronization:sequence_checker",
|
||||||
"../rtc_base/system:file_wrapper",
|
"../rtc_base/system:file_wrapper",
|
||||||
"../rtc_base/system:rtc_export",
|
"../rtc_base/system:rtc_export",
|
||||||
|
"../rtc_base/task_utils:pending_task_safety_flag",
|
||||||
"../rtc_base/task_utils:to_queued_task",
|
"../rtc_base/task_utils:to_queued_task",
|
||||||
"../rtc_base/third_party/base64",
|
"../rtc_base/third_party/base64",
|
||||||
"../rtc_base/third_party/sigslot",
|
"../rtc_base/third_party/sigslot",
|
||||||
|
@ -193,6 +193,9 @@ VoiceChannel* ChannelManager::CreateVoiceChannel(
|
|||||||
const webrtc::CryptoOptions& crypto_options,
|
const webrtc::CryptoOptions& crypto_options,
|
||||||
rtc::UniqueRandomIdGenerator* ssrc_generator,
|
rtc::UniqueRandomIdGenerator* ssrc_generator,
|
||||||
const AudioOptions& options) {
|
const AudioOptions& options) {
|
||||||
|
// TODO(bugs.webrtc.org/11992): Remove this workaround after updates in
|
||||||
|
// PeerConnection and add the expectation that we're already on the right
|
||||||
|
// thread.
|
||||||
if (!worker_thread_->IsCurrent()) {
|
if (!worker_thread_->IsCurrent()) {
|
||||||
return worker_thread_->Invoke<VoiceChannel*>(RTC_FROM_HERE, [&] {
|
return worker_thread_->Invoke<VoiceChannel*>(RTC_FROM_HERE, [&] {
|
||||||
return CreateVoiceChannel(call, media_config, rtp_transport,
|
return CreateVoiceChannel(call, media_config, rtp_transport,
|
||||||
@ -262,6 +265,9 @@ VideoChannel* ChannelManager::CreateVideoChannel(
|
|||||||
rtc::UniqueRandomIdGenerator* ssrc_generator,
|
rtc::UniqueRandomIdGenerator* ssrc_generator,
|
||||||
const VideoOptions& options,
|
const VideoOptions& options,
|
||||||
webrtc::VideoBitrateAllocatorFactory* video_bitrate_allocator_factory) {
|
webrtc::VideoBitrateAllocatorFactory* video_bitrate_allocator_factory) {
|
||||||
|
// TODO(bugs.webrtc.org/11992): Remove this workaround after updates in
|
||||||
|
// PeerConnection and add the expectation that we're already on the right
|
||||||
|
// thread.
|
||||||
if (!worker_thread_->IsCurrent()) {
|
if (!worker_thread_->IsCurrent()) {
|
||||||
return worker_thread_->Invoke<VideoChannel*>(RTC_FROM_HERE, [&] {
|
return worker_thread_->Invoke<VideoChannel*>(RTC_FROM_HERE, [&] {
|
||||||
return CreateVideoChannel(call, media_config, rtp_transport,
|
return CreateVideoChannel(call, media_config, rtp_transport,
|
||||||
|
@ -56,6 +56,7 @@
|
|||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
#include "rtc_base/string_encode.h"
|
#include "rtc_base/string_encode.h"
|
||||||
#include "rtc_base/strings/string_builder.h"
|
#include "rtc_base/strings/string_builder.h"
|
||||||
|
#include "rtc_base/task_utils/to_queued_task.h"
|
||||||
#include "rtc_base/trace_event.h"
|
#include "rtc_base/trace_event.h"
|
||||||
#include "system_wrappers/include/clock.h"
|
#include "system_wrappers/include/clock.h"
|
||||||
#include "system_wrappers/include/metrics.h"
|
#include "system_wrappers/include/metrics.h"
|
||||||
@ -1038,6 +1039,8 @@ PeerConnection::PeerConnection(PeerConnectionFactory* factory,
|
|||||||
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
|
local_ice_credentials_to_replace_(new LocalIceCredentialsToReplace()),
|
||||||
data_channel_controller_(this),
|
data_channel_controller_(this),
|
||||||
weak_ptr_factory_(this) {
|
weak_ptr_factory_(this) {
|
||||||
|
RTC_DCHECK(factory_);
|
||||||
|
// Note: call_ appears to be set to nullptr by some callers.
|
||||||
operations_chain_->SetOnChainEmptyCallback(
|
operations_chain_->SetOnChainEmptyCallback(
|
||||||
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr()]() {
|
[this_weak_ptr = weak_ptr_factory_.GetWeakPtr()]() {
|
||||||
if (!this_weak_ptr)
|
if (!this_weak_ptr)
|
||||||
@ -1082,6 +1085,7 @@ PeerConnection::~PeerConnection() {
|
|||||||
// call_ and event_log_ must be destroyed on the worker thread.
|
// call_ and event_log_ must be destroyed on the worker thread.
|
||||||
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
|
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
|
||||||
RTC_DCHECK_RUN_ON(worker_thread());
|
RTC_DCHECK_RUN_ON(worker_thread());
|
||||||
|
call_safety_.reset();
|
||||||
call_.reset();
|
call_.reset();
|
||||||
// The event log must outlive call (and any other object that uses it).
|
// The event log must outlive call (and any other object that uses it).
|
||||||
event_log_.reset();
|
event_log_.reset();
|
||||||
@ -1215,25 +1219,7 @@ bool PeerConnection::Initialize(
|
|||||||
? *configuration.crypto_options
|
? *configuration.crypto_options
|
||||||
: options.crypto_options;
|
: options.crypto_options;
|
||||||
config.transport_observer = this;
|
config.transport_observer = this;
|
||||||
// It's safe to pass |this| and using |rtcp_invoker_| and the |call_| pointer
|
config.rtcp_handler = InitializeRtcpCallback();
|
||||||
// since the JsepTransportController instance is owned by this PeerConnection
|
|
||||||
// instance and is destroyed before both |rtcp_invoker_| and the |call_|
|
|
||||||
// pointer.
|
|
||||||
config.rtcp_handler = [this](const rtc::CopyOnWriteBuffer& packet,
|
|
||||||
int64_t packet_time_us) {
|
|
||||||
RTC_DCHECK_RUN_ON(network_thread());
|
|
||||||
rtcp_invoker_.AsyncInvoke<void>(
|
|
||||||
RTC_FROM_HERE, worker_thread(), [this, packet, packet_time_us] {
|
|
||||||
RTC_DCHECK_RUN_ON(worker_thread());
|
|
||||||
// |call_| is reset on the worker thread in the PeerConnection
|
|
||||||
// destructor, so we check that it's still valid before propagating
|
|
||||||
// the packet.
|
|
||||||
if (call_) {
|
|
||||||
call_->Receiver()->DeliverPacket(MediaType::ANY, packet,
|
|
||||||
packet_time_us);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
};
|
|
||||||
config.event_log = event_log_ptr_;
|
config.event_log = event_log_ptr_;
|
||||||
#if defined(ENABLE_EXTERNAL_AUTH)
|
#if defined(ENABLE_EXTERNAL_AUTH)
|
||||||
config.enable_external_auth = true;
|
config.enable_external_auth = true;
|
||||||
@ -3619,6 +3605,13 @@ RTCError PeerConnection::UpdateTransceiverChannel(
|
|||||||
const cricket::ContentGroup* bundle_group) {
|
const cricket::ContentGroup* bundle_group) {
|
||||||
RTC_DCHECK(IsUnifiedPlan());
|
RTC_DCHECK(IsUnifiedPlan());
|
||||||
RTC_DCHECK(transceiver);
|
RTC_DCHECK(transceiver);
|
||||||
|
// TODO(bugs.webrtc.org/11992): This function always returns RTCError::OK().
|
||||||
|
// Some of the below methods, specifically Create & Destroy, need to be called
|
||||||
|
// on the worker thread. Consider if there should be a split here where we do
|
||||||
|
// things asynchronously in two steps and change the return type of the
|
||||||
|
// function to be void. Note that in the case of 'create', that would/could
|
||||||
|
// mean that SetChannel might get called at a much later stage than it happens
|
||||||
|
// now.
|
||||||
cricket::ChannelInterface* channel = transceiver->internal()->channel();
|
cricket::ChannelInterface* channel = transceiver->internal()->channel();
|
||||||
if (content.rejected) {
|
if (content.rejected) {
|
||||||
if (channel) {
|
if (channel) {
|
||||||
@ -4462,6 +4455,7 @@ void PeerConnection::Close() {
|
|||||||
|
|
||||||
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
|
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
|
||||||
RTC_DCHECK_RUN_ON(worker_thread());
|
RTC_DCHECK_RUN_ON(worker_thread());
|
||||||
|
call_safety_.reset();
|
||||||
call_.reset();
|
call_.reset();
|
||||||
// The event log must outlive call (and any other object that uses it).
|
// The event log must outlive call (and any other object that uses it).
|
||||||
event_log_.reset();
|
event_log_.reset();
|
||||||
@ -6676,6 +6670,9 @@ cricket::VoiceChannel* PeerConnection::CreateVoiceChannel(
|
|||||||
const std::string& mid) {
|
const std::string& mid) {
|
||||||
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
|
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
|
||||||
|
|
||||||
|
// TODO(bugs.webrtc.org/11992): CreateVoiceChannel internally switches to the
|
||||||
|
// worker thread. We shouldn't be using the |call_ptr_| hack here but simply
|
||||||
|
// be on the worker thread and use |call_| (update upstream code).
|
||||||
cricket::VoiceChannel* voice_channel = channel_manager()->CreateVoiceChannel(
|
cricket::VoiceChannel* voice_channel = channel_manager()->CreateVoiceChannel(
|
||||||
call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
|
call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
|
||||||
mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_,
|
mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_,
|
||||||
@ -6697,6 +6694,9 @@ cricket::VideoChannel* PeerConnection::CreateVideoChannel(
|
|||||||
const std::string& mid) {
|
const std::string& mid) {
|
||||||
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
|
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
|
||||||
|
|
||||||
|
// TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the
|
||||||
|
// worker thread. We shouldn't be using the |call_ptr_| hack here but simply
|
||||||
|
// be on the worker thread and use |call_| (update upstream code).
|
||||||
cricket::VideoChannel* video_channel = channel_manager()->CreateVideoChannel(
|
cricket::VideoChannel* video_channel = channel_manager()->CreateVideoChannel(
|
||||||
call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
|
call_ptr_, configuration_.media_config, rtp_transport, signaling_thread(),
|
||||||
mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_, video_options_,
|
mid, SrtpRequired(), GetCryptoOptions(), &ssrc_generator_, video_options_,
|
||||||
@ -7367,6 +7367,10 @@ void PeerConnection::DestroyDataChannelTransport() {
|
|||||||
|
|
||||||
void PeerConnection::DestroyChannelInterface(
|
void PeerConnection::DestroyChannelInterface(
|
||||||
cricket::ChannelInterface* channel) {
|
cricket::ChannelInterface* channel) {
|
||||||
|
// TODO(bugs.webrtc.org/11992): All the below methods should be called on the
|
||||||
|
// worker thread. (they switch internally anyway). Change
|
||||||
|
// DestroyChannelInterface to either be called on the worker thread, or do
|
||||||
|
// this asynchronously on the worker.
|
||||||
RTC_DCHECK(channel);
|
RTC_DCHECK(channel);
|
||||||
switch (channel->media_type()) {
|
switch (channel->media_type()) {
|
||||||
case cricket::MEDIA_TYPE_AUDIO:
|
case cricket::MEDIA_TYPE_AUDIO:
|
||||||
@ -7786,4 +7790,37 @@ RTCError PeerConnection::Rollback(SdpType desc_type) {
|
|||||||
return RTCError::OK();
|
return RTCError::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::function<void(const rtc::CopyOnWriteBuffer& packet,
|
||||||
|
int64_t packet_time_us)>
|
||||||
|
PeerConnection::InitializeRtcpCallback() {
|
||||||
|
RTC_DCHECK_RUN_ON(signaling_thread());
|
||||||
|
|
||||||
|
auto flag =
|
||||||
|
worker_thread()->Invoke<rtc::scoped_refptr<PendingTaskSafetyFlag>>(
|
||||||
|
RTC_FROM_HERE, [this] {
|
||||||
|
RTC_DCHECK_RUN_ON(worker_thread());
|
||||||
|
if (!call_)
|
||||||
|
return rtc::scoped_refptr<PendingTaskSafetyFlag>();
|
||||||
|
if (!call_safety_)
|
||||||
|
call_safety_.reset(new ScopedTaskSafety());
|
||||||
|
return call_safety_->flag();
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!flag)
|
||||||
|
return [](const rtc::CopyOnWriteBuffer&, int64_t) {};
|
||||||
|
|
||||||
|
return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet,
|
||||||
|
int64_t packet_time_us) {
|
||||||
|
RTC_DCHECK_RUN_ON(network_thread());
|
||||||
|
// TODO(bugs.webrtc.org/11993): We should actually be delivering this call
|
||||||
|
// directly to the Call class somehow directly on the network thread and not
|
||||||
|
// incur this hop here. The DeliverPacket() method will eventually just have
|
||||||
|
// to hop back over to the network thread.
|
||||||
|
worker_thread()->PostTask(ToQueuedTask(flag, [this, packet,
|
||||||
|
packet_time_us] {
|
||||||
|
RTC_DCHECK_RUN_ON(worker_thread());
|
||||||
|
call_->Receiver()->DeliverPacket(MediaType::ANY, packet, packet_time_us);
|
||||||
|
}));
|
||||||
|
};
|
||||||
|
}
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
#ifndef PC_PEER_CONNECTION_H_
|
#ifndef PC_PEER_CONNECTION_H_
|
||||||
#define PC_PEER_CONNECTION_H_
|
#define PC_PEER_CONNECTION_H_
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <set>
|
#include <set>
|
||||||
@ -36,6 +37,7 @@
|
|||||||
#include "rtc_base/experiments/field_trial_parser.h"
|
#include "rtc_base/experiments/field_trial_parser.h"
|
||||||
#include "rtc_base/operations_chain.h"
|
#include "rtc_base/operations_chain.h"
|
||||||
#include "rtc_base/race_checker.h"
|
#include "rtc_base/race_checker.h"
|
||||||
|
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
||||||
#include "rtc_base/unique_id_generator.h"
|
#include "rtc_base/unique_id_generator.h"
|
||||||
#include "rtc_base/weak_ptr.h"
|
#include "rtc_base/weak_ptr.h"
|
||||||
|
|
||||||
@ -1149,6 +1151,10 @@ class PeerConnection : public PeerConnectionInternal,
|
|||||||
// | desc_type | is the type of the description that caused the rollback.
|
// | desc_type | is the type of the description that caused the rollback.
|
||||||
RTCError Rollback(SdpType desc_type);
|
RTCError Rollback(SdpType desc_type);
|
||||||
|
|
||||||
|
std::function<void(const rtc::CopyOnWriteBuffer& packet,
|
||||||
|
int64_t packet_time_us)>
|
||||||
|
InitializeRtcpCallback();
|
||||||
|
|
||||||
// Storing the factory as a scoped reference pointer ensures that the memory
|
// Storing the factory as a scoped reference pointer ensures that the memory
|
||||||
// in the PeerConnectionFactoryImpl remains available as long as the
|
// in the PeerConnectionFactoryImpl remains available as long as the
|
||||||
// PeerConnection is running. It is passed to PeerConnection as a raw pointer.
|
// PeerConnection is running. It is passed to PeerConnection as a raw pointer.
|
||||||
@ -1234,11 +1240,13 @@ class PeerConnection : public PeerConnectionInternal,
|
|||||||
// The unique_ptr belongs to the worker thread, but the Call object manages
|
// The unique_ptr belongs to the worker thread, but the Call object manages
|
||||||
// its own thread safety.
|
// its own thread safety.
|
||||||
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
|
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
|
||||||
|
std::unique_ptr<ScopedTaskSafety> call_safety_
|
||||||
rtc::AsyncInvoker rtcp_invoker_ RTC_GUARDED_BY(network_thread());
|
RTC_GUARDED_BY(worker_thread());
|
||||||
|
|
||||||
// Points to the same thing as `call_`. Since it's const, we may read the
|
// Points to the same thing as `call_`. Since it's const, we may read the
|
||||||
// pointer from any thread.
|
// pointer from any thread.
|
||||||
|
// TODO(bugs.webrtc.org/11992): Remove this workaround (and potential dangling
|
||||||
|
// pointer).
|
||||||
Call* const call_ptr_;
|
Call* const call_ptr_;
|
||||||
|
|
||||||
std::unique_ptr<StatsCollector> stats_
|
std::unique_ptr<StatsCollector> stats_
|
||||||
|
Reference in New Issue
Block a user