Revert "Remove thread hops from events provided by JsepTransportController."

This reverts commit f554b3c577f69fa9ffad5c07155898c2d985ac76.

Reason for revert: Parent CL breaks FYI bots.
See https://webrtc-review.googlesource.com/c/src/+/206466

Original change's description:
> Remove thread hops from events provided by JsepTransportController.
>
> Events associated with Subscribe* methods in JTC had trampolines that
> would use an async invoker to fire the events on the signaling thread.
> This was being done for the purposes of PeerConnection but the concept
> of a signaling thread is otherwise not applicable to JTC and use of
> JTC from PC is inconsistent across threads (as has been flagged in
> webrtc:9987).
>
> This change makes all CallbackList members only accessible from the
> network thread and moves the signaling thread related work over to
> PeerConnection, which makes hops there more visible as well as making
> that class easier to refactor for thread efficiency.
>
> This CL removes the AsyncInvoker from JTC (webrtc:12339)
>
> The signaling_thread_ variable is also removed from JTC and more thread
> checks added to catch errors.
>
> Bug: webrtc:12427, webrtc:11988, webrtc:12339
> Change-Id: Id232aedd00dfd5403b2ba0ca147d3eca7c12c7c5
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206062
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33195}

TBR=nisse@webrtc.org,tommi@webrtc.org

Change-Id: I6134b71b74a9408854b79d44506d513519e9cf4d
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:12427
Bug: webrtc:11988
Bug: webrtc:12339
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206467
Reviewed-by: Guido Urdaneta <guidou@webrtc.org>
Commit-Queue: Guido Urdaneta <guidou@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33203}
This commit is contained in:
Guido Urdaneta
2021-02-09 12:26:13 +00:00
committed by Commit Bot
parent 82ce7e5515
commit 6e4fcac313
8 changed files with 269 additions and 263 deletions

View File

@ -62,6 +62,7 @@ class ConnectionContext : public rtc::RefCountInterface {
// Functions called from PeerConnection and friends
SctpTransportFactoryInterface* sctp_transport_factory() const {
RTC_DCHECK_RUN_ON(signaling_thread_);
return sctp_factory_.get();
}
@ -122,7 +123,8 @@ class ConnectionContext : public rtc::RefCountInterface {
RTC_GUARDED_BY(signaling_thread_);
std::unique_ptr<cricket::MediaEngineInterface> media_engine_
RTC_GUARDED_BY(signaling_thread_);
std::unique_ptr<SctpTransportFactoryInterface> const sctp_factory_;
std::unique_ptr<SctpTransportFactoryInterface> const sctp_factory_
RTC_GUARDED_BY(signaling_thread_);
// Accessed both on signaling thread and worker thread.
std::unique_ptr<WebRtcKeyValueConfig> const trials_;
};

View File

@ -84,11 +84,13 @@ webrtc::RTCError VerifyCandidates(const cricket::Candidates& candidates) {
namespace webrtc {
JsepTransportController::JsepTransportController(
rtc::Thread* signaling_thread,
rtc::Thread* network_thread,
cricket::PortAllocator* port_allocator,
AsyncResolverFactory* async_resolver_factory,
Config config)
: network_thread_(network_thread),
: signaling_thread_(signaling_thread),
network_thread_(network_thread),
port_allocator_(port_allocator),
async_resolver_factory_(async_resolver_factory),
config_(config),
@ -220,6 +222,12 @@ void JsepTransportController::SetNeedsIceRestartFlag() {
bool JsepTransportController::NeedsIceRestart(
const std::string& transport_name) const {
if (!network_thread_->IsCurrent()) {
RTC_DCHECK_RUN_ON(signaling_thread_);
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); });
}
RTC_DCHECK_RUN_ON(network_thread_);
const cricket::JsepTransport* transport =
@ -406,6 +414,11 @@ RTCError JsepTransportController::RemoveRemoteCandidates(
bool JsepTransportController::GetStats(const std::string& transport_name,
cricket::TransportStats* stats) {
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); });
}
RTC_DCHECK_RUN_ON(network_thread_);
cricket::JsepTransport* transport = GetJsepTransportByName(transport_name);
@ -1181,24 +1194,35 @@ void JsepTransportController::OnTransportCandidateGathered_n(
RTC_NOTREACHED();
return;
}
signal_ice_candidates_gathered_.Send(
transport->transport_name(), std::vector<cricket::Candidate>{candidate});
std::string transport_name = transport->transport_name();
// TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be
// able to just call this directly here.
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {
signal_ice_candidates_gathered_.Send(
transport_name, std::vector<cricket::Candidate>{candidate});
});
}
void JsepTransportController::OnTransportCandidateError_n(
cricket::IceTransportInternal* transport,
const cricket::IceCandidateErrorEvent& event) {
signal_ice_candidate_error_.Send(event);
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_error_.Send(event);
});
}
void JsepTransportController::OnTransportCandidatesRemoved_n(
cricket::IceTransportInternal* transport,
const cricket::Candidates& candidates) {
signal_ice_candidates_removed_.Send(candidates);
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_,
[this, candidates] { signal_ice_candidates_removed_.Send(candidates); });
}
void JsepTransportController::OnTransportCandidatePairChanged_n(
const cricket::CandidatePairChangeEvent& event) {
signal_ice_candidate_pair_changed_.Send(event);
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_pair_changed_.Send(event);
});
}
void JsepTransportController::OnTransportRoleConflict_n(
@ -1274,7 +1298,10 @@ void JsepTransportController::UpdateAggregateStates_n() {
if (ice_connection_state_ != new_connection_state) {
ice_connection_state_ = new_connection_state;
signal_ice_connection_state_.Send(new_connection_state);
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_connection_state] {
signal_ice_connection_state_.Send(new_connection_state);
});
}
// Compute the current RTCIceConnectionState as described in
@ -1330,11 +1357,17 @@ void JsepTransportController::UpdateAggregateStates_n() {
new_ice_connection_state ==
PeerConnectionInterface::kIceConnectionCompleted) {
// Ensure that we never skip over the "connected" state.
signal_standardized_ice_connection_state_.Send(
PeerConnectionInterface::kIceConnectionConnected);
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this] {
signal_standardized_ice_connection_state_.Send(
PeerConnectionInterface::kIceConnectionConnected);
});
}
standardized_ice_connection_state_ = new_ice_connection_state;
signal_standardized_ice_connection_state_.Send(new_ice_connection_state);
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_,
[this, new_ice_connection_state] {
signal_standardized_ice_connection_state_.Send(
new_ice_connection_state);
});
}
// Compute the current RTCPeerConnectionState as described in
@ -1385,7 +1418,10 @@ void JsepTransportController::UpdateAggregateStates_n() {
if (combined_connection_state_ != new_combined_state) {
combined_connection_state_ = new_combined_state;
signal_connection_state_.Send(new_combined_state);
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_combined_state] {
signal_connection_state_.Send(new_combined_state);
});
}
// Compute the gathering state.
@ -1398,7 +1434,10 @@ void JsepTransportController::UpdateAggregateStates_n() {
}
if (ice_gathering_state_ != new_gathering_state) {
ice_gathering_state_ = new_gathering_state;
signal_ice_gathering_state_.Send(new_gathering_state);
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, new_gathering_state] {
signal_ice_gathering_state_.Send(new_gathering_state);
});
}
}

View File

@ -54,6 +54,7 @@
#include "pc/session_description.h"
#include "pc/srtp_transport.h"
#include "pc/transport_stats.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/callback_list.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/copy_on_write_buffer.h"
@ -136,11 +137,10 @@ class JsepTransportController : public sigslot::has_slots<> {
std::function<void(const rtc::SSLHandshakeError)> on_dtls_handshake_error_;
};
// The ICE related events are fired on the |network_thread|.
// All the transport related methods are called on the |network_thread|
// and destruction of the JsepTransportController must occur on the
// |network_thread|.
JsepTransportController(rtc::Thread* network_thread,
// The ICE related events are signaled on the |signaling_thread|.
// All the transport related methods are called on the |network_thread|.
JsepTransportController(rtc::Thread* signaling_thread,
rtc::Thread* network_thread,
cricket::PortAllocator* port_allocator,
AsyncResolverFactory* async_resolver_factory,
Config config);
@ -227,28 +227,26 @@ class JsepTransportController : public sigslot::has_slots<> {
// F: void(const std::string&, const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidateGathered(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
// TODO(bugs.webrtc.org/12427): Post this subscription to the network
// thread.
signal_ice_candidates_gathered_.AddReceiver(std::forward<F>(callback));
}
// F: void(cricket::IceConnectionState)
template <typename F>
void SubscribeIceConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_connection_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(PeerConnectionInterface::PeerConnectionState)
template <typename F>
void SubscribeConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_connection_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(PeerConnectionInterface::IceConnectionState)
template <typename F>
void SubscribeStandardizedIceConnectionState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_standardized_ice_connection_state_.AddReceiver(
std::forward<F>(callback));
}
@ -256,65 +254,60 @@ class JsepTransportController : public sigslot::has_slots<> {
// F: void(cricket::IceGatheringState)
template <typename F>
void SubscribeIceGatheringState(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_gathering_state_.AddReceiver(std::forward<F>(callback));
}
// F: void(const cricket::IceCandidateErrorEvent&)
template <typename F>
void SubscribeIceCandidateError(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidate_error_.AddReceiver(std::forward<F>(callback));
}
// F: void(const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidatesRemoved(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidates_removed_.AddReceiver(std::forward<F>(callback));
}
// F: void(const cricket::CandidatePairChangeEvent&)
template <typename F>
void SubscribeIceCandidatePairChanged(F&& callback) {
RTC_DCHECK_RUN_ON(network_thread_);
signal_ice_candidate_pair_changed_.AddReceiver(std::forward<F>(callback));
}
private:
// All of these callbacks are fired on the network thread.
// All of these callbacks are fired on the signaling thread.
// If any transport failed => failed,
// Else if all completed => completed,
// Else if all connected => connected,
// Else => connecting
CallbackList<cricket::IceConnectionState> signal_ice_connection_state_
RTC_GUARDED_BY(network_thread_);
CallbackList<cricket::IceConnectionState> signal_ice_connection_state_;
CallbackList<PeerConnectionInterface::PeerConnectionState>
signal_connection_state_ RTC_GUARDED_BY(network_thread_);
signal_connection_state_;
CallbackList<PeerConnectionInterface::IceConnectionState>
signal_standardized_ice_connection_state_ RTC_GUARDED_BY(network_thread_);
signal_standardized_ice_connection_state_;
// If all transports done gathering => complete,
// Else if any are gathering => gathering,
// Else => new
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_
RTC_GUARDED_BY(network_thread_);
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_;
// [mid, candidates]
// TODO(bugs.webrtc.org/12427): Protect this with network_thread_.
CallbackList<const std::string&, const std::vector<cricket::Candidate>&>
signal_ice_candidates_gathered_ RTC_GUARDED_BY(network_thread_);
signal_ice_candidates_gathered_;
CallbackList<const cricket::IceCandidateErrorEvent&>
signal_ice_candidate_error_ RTC_GUARDED_BY(network_thread_);
signal_ice_candidate_error_;
CallbackList<const std::vector<cricket::Candidate>&>
signal_ice_candidates_removed_ RTC_GUARDED_BY(network_thread_);
signal_ice_candidates_removed_;
CallbackList<const cricket::CandidatePairChangeEvent&>
signal_ice_candidate_pair_changed_ RTC_GUARDED_BY(network_thread_);
signal_ice_candidate_pair_changed_;
RTCError ApplyDescription_n(bool local,
SdpType type,
@ -459,6 +452,7 @@ class JsepTransportController : public sigslot::has_slots<> {
void OnDtlsHandshakeError(rtc::SSLHandshakeError error);
rtc::Thread* const signaling_thread_ = nullptr;
rtc::Thread* const network_thread_ = nullptr;
cricket::PortAllocator* const port_allocator_ = nullptr;
AsyncResolverFactory* const async_resolver_factory_ = nullptr;
@ -496,6 +490,7 @@ class JsepTransportController : public sigslot::has_slots<> {
cricket::IceRole ice_role_ = cricket::ICEROLE_CONTROLLING;
uint64_t ice_tiebreaker_ = rtc::CreateRandomId64();
rtc::scoped_refptr<rtc::RTCCertificate> certificate_;
rtc::AsyncInvoker invoker_;
RTC_DISALLOW_COPY_AND_ASSIGN(JsepTransportController);
};

View File

@ -74,6 +74,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
void CreateJsepTransportController(
JsepTransportController::Config config,
rtc::Thread* signaling_thread = rtc::Thread::Current(),
rtc::Thread* network_thread = rtc::Thread::Current(),
cricket::PortAllocator* port_allocator = nullptr) {
config.transport_observer = this;
@ -83,10 +84,9 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
config.dtls_transport_factory = fake_dtls_transport_factory_.get();
config.on_dtls_handshake_error_ = [](rtc::SSLHandshakeError s) {};
transport_controller_ = std::make_unique<JsepTransportController>(
network_thread, port_allocator, nullptr /* async_resolver_factory */,
config);
network_thread->Invoke<void>(RTC_FROM_HERE,
[&] { ConnectTransportControllerSignals(); });
signaling_thread, network_thread, port_allocator,
nullptr /* async_resolver_factory */, config);
ConnectTransportControllerSignals();
}
void ConnectTransportControllerSignals() {
@ -276,14 +276,18 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
protected:
void OnConnectionState(cricket::IceConnectionState state) {
ice_signaled_on_thread_ = rtc::Thread::Current();
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
connection_state_ = state;
++connection_state_signal_count_;
}
void OnStandardizedIceConnectionState(
PeerConnectionInterface::IceConnectionState state) {
ice_signaled_on_thread_ = rtc::Thread::Current();
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
ice_connection_state_ = state;
++ice_connection_state_signal_count_;
}
@ -292,20 +296,26 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
PeerConnectionInterface::PeerConnectionState state) {
RTC_LOG(LS_INFO) << "OnCombinedConnectionState: "
<< static_cast<int>(state);
ice_signaled_on_thread_ = rtc::Thread::Current();
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
combined_connection_state_ = state;
++combined_connection_state_signal_count_;
}
void OnGatheringState(cricket::IceGatheringState state) {
ice_signaled_on_thread_ = rtc::Thread::Current();
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
gathering_state_ = state;
++gathering_state_signal_count_;
}
void OnCandidatesGathered(const std::string& transport_name,
const Candidates& candidates) {
ice_signaled_on_thread_ = rtc::Thread::Current();
if (!signaling_thread_->IsCurrent()) {
signaled_on_non_signaling_thread_ = true;
}
candidates_[transport_name].insert(candidates_[transport_name].end(),
candidates.begin(), candidates.end());
++candidates_signal_count_;
@ -350,7 +360,7 @@ class JsepTransportControllerTest : public JsepTransportController::Observer,
std::unique_ptr<FakeIceTransportFactory> fake_ice_transport_factory_;
std::unique_ptr<FakeDtlsTransportFactory> fake_dtls_transport_factory_;
rtc::Thread* const signaling_thread_ = nullptr;
rtc::Thread* ice_signaled_on_thread_ = nullptr;
bool signaled_on_non_signaling_thread_ = false;
// Used to verify the SignalRtpTransportChanged/SignalDtlsTransportChanged are
// signaled correctly.
std::map<std::string, RtpTransportInternal*> changed_rtp_transport_by_mid_;
@ -873,12 +883,11 @@ TEST_F(JsepTransportControllerTest, SignalCandidatesGathered) {
EXPECT_EQ(1u, candidates_[kAudioMid1].size());
}
TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) {
TEST_F(JsepTransportControllerTest, IceSignalingOccursOnSignalingThread) {
network_thread_ = rtc::Thread::CreateWithSocketServer();
network_thread_->Start();
EXPECT_EQ(ice_signaled_on_thread_, nullptr);
CreateJsepTransportController(JsepTransportController::Config(),
network_thread_.get(),
signaling_thread_, network_thread_.get(),
/*port_allocator=*/nullptr);
CreateLocalDescriptionAndCompleteConnectionOnNetworkThread();
@ -894,7 +903,7 @@ TEST_F(JsepTransportControllerTest, IceSignalingOccursOnNetworkThread) {
EXPECT_EQ_WAIT(1u, candidates_[kVideoMid1].size(), kTimeout);
EXPECT_EQ(2, candidates_signal_count_);
EXPECT_EQ(ice_signaled_on_thread_, network_thread_.get());
EXPECT_TRUE(!signaled_on_non_signaling_thread_);
network_thread_->Invoke<void>(RTC_FROM_HERE,
[&] { transport_controller_.reset(); });

View File

@ -88,6 +88,7 @@ const char kSimulcastNumberOfEncodings[] =
static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000;
uint32_t ConvertIceTransportTypeToCandidateFilter(
PeerConnectionInterface::IceTransportsType type) {
switch (type) {
@ -263,20 +264,6 @@ bool HasRtcpMuxEnabled(const cricket::ContentInfo* content) {
return content->media_description()->rtcp_mux();
}
bool DtlsEnabled(const PeerConnectionInterface::RTCConfiguration& configuration,
const PeerConnectionFactoryInterface::Options& options,
const PeerConnectionDependencies& dependencies) {
if (options.disable_encryption)
return false;
// Enable DTLS by default if we have an identity store or a certificate.
bool default_enabled =
(dependencies.cert_generator || !configuration.certificates.empty());
// The |configuration| can override the default value.
return configuration.enable_dtls_srtp.value_or(default_enabled);
}
} // namespace
bool PeerConnectionInterface::RTCConfiguration::operator==(
@ -434,12 +421,11 @@ RTCErrorOr<rtc::scoped_refptr<PeerConnection>> PeerConnection::Create(
bool is_unified_plan =
configuration.sdp_semantics == SdpSemantics::kUnifiedPlan;
bool dtls_enabled = DtlsEnabled(configuration, options, dependencies);
// The PeerConnection constructor consumes some, but not all, dependencies.
rtc::scoped_refptr<PeerConnection> pc(
new rtc::RefCountedObject<PeerConnection>(
context, options, is_unified_plan, std::move(event_log),
std::move(call), dependencies, dtls_enabled));
std::move(call), dependencies));
RTCError init_error = pc->Initialize(configuration, std::move(dependencies));
if (!init_error.ok()) {
RTC_LOG(LS_ERROR) << "PeerConnection initialization failed";
@ -454,8 +440,7 @@ PeerConnection::PeerConnection(
bool is_unified_plan,
std::unique_ptr<RtcEventLog> event_log,
std::unique_ptr<Call> call,
PeerConnectionDependencies& dependencies,
bool dtls_enabled)
PeerConnectionDependencies& dependencies)
: context_(context),
options_(options),
observer_(dependencies.observer),
@ -468,17 +453,9 @@ PeerConnection::PeerConnection(
tls_cert_verifier_(std::move(dependencies.tls_cert_verifier)),
call_(std::move(call)),
call_ptr_(call_.get()),
dtls_enabled_(dtls_enabled),
data_channel_controller_(this),
message_handler_(signaling_thread()),
weak_factory_(this) {
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
worker_thread_safety_ = PendingTaskSafetyFlag::Create();
if (!call_)
worker_thread_safety_->SetNotAlive();
});
}
weak_factory_(this) {}
PeerConnection::~PeerConnection() {
TRACE_EVENT0("webrtc", "PeerConnection::~PeerConnection");
@ -519,13 +496,15 @@ PeerConnection::~PeerConnection() {
RTC_DCHECK_RUN_ON(network_thread());
transport_controller_.reset();
port_allocator_.reset();
if (network_thread_safety_)
if (network_thread_safety_) {
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
});
// call_ and event_log_ must be destroyed on the worker thread.
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
worker_thread_safety_->SetNotAlive();
call_safety_.reset();
call_.reset();
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
@ -552,6 +531,20 @@ RTCError PeerConnection::Initialize(
turn_server.turn_logging_id = configuration.turn_logging_id;
}
// The port allocator lives on the network thread and should be initialized
// there. Also set up the task safety flag for canceling pending tasks on
// the network thread when closing.
// TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
// initialize all the |transport_controller_->Subscribe*| calls below on the
// network thread via this invoke.
const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
network_thread_safety_ = PendingTaskSafetyFlag::Create();
return InitializePortAllocator_n(stun_servers, turn_servers,
configuration);
});
// Note if STUN or TURN servers were supplied.
if (!stun_servers.empty()) {
NoteUsageEvent(UsageEvent::STUN_SERVER_ADDED);
@ -560,11 +553,52 @@ RTCError PeerConnection::Initialize(
NoteUsageEvent(UsageEvent::TURN_SERVER_ADDED);
}
// Send information about IPv4/IPv6 status.
PeerConnectionAddressFamilyCounter address_family;
if (pa_result.enable_ipv6) {
address_family = kPeerConnection_IPv6;
} else {
address_family = kPeerConnection_IPv4;
}
RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family,
kPeerConnectionAddressFamilyCounter_Max);
// RFC 3264: The numeric value of the session id and version in the
// o line MUST be representable with a "64 bit signed integer".
// Due to this constraint session id |session_id_| is max limited to
// LLONG_MAX.
session_id_ = rtc::ToString(rtc::CreateRandomId64() & LLONG_MAX);
JsepTransportController::Config config;
config.redetermine_role_on_ice_restart =
configuration.redetermine_role_on_ice_restart;
config.ssl_max_version = options_.ssl_max_version;
config.disable_encryption = options_.disable_encryption;
config.bundle_policy = configuration.bundle_policy;
config.rtcp_mux_policy = configuration.rtcp_mux_policy;
// TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove
// this stub.
config.crypto_options = configuration.crypto_options.has_value()
? *configuration.crypto_options
: options_.crypto_options;
config.transport_observer = this;
config.rtcp_handler = InitializeRtcpCallback();
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;
#endif
config.active_reset_srtp_params = configuration.active_reset_srtp_params;
if (options_.disable_encryption) {
dtls_enabled_ = false;
} else {
// Enable DTLS by default if we have an identity store or a certificate.
dtls_enabled_ =
(dependencies.cert_generator || !configuration.certificates.empty());
// |configuration| can override the default |dtls_enabled_| value.
if (configuration.enable_dtls_srtp) {
dtls_enabled_ = *(configuration.enable_dtls_srtp);
}
}
if (configuration.enable_rtp_data_channel) {
// Enable creation of RTP data channels if the kEnableRtpDataChannels is
@ -575,27 +609,77 @@ RTCError PeerConnection::Initialize(
// DTLS has to be enabled to use SCTP.
if (!options_.disable_sctp_data_channels && dtls_enabled_) {
data_channel_controller_.set_data_channel_type(cricket::DCT_SCTP);
config.sctp_factory = context_->sctp_transport_factory();
}
}
// Network thread initialization.
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, &stun_servers,
&turn_servers, &configuration,
&dependencies] {
RTC_DCHECK_RUN_ON(network_thread());
network_thread_safety_ = PendingTaskSafetyFlag::Create();
InitializePortAllocatorResult pa_result =
InitializePortAllocator_n(stun_servers, turn_servers, configuration);
// Send information about IPv4/IPv6 status.
PeerConnectionAddressFamilyCounter address_family =
pa_result.enable_ipv6 ? kPeerConnection_IPv6 : kPeerConnection_IPv4;
RTC_HISTOGRAM_ENUMERATION("WebRTC.PeerConnection.IPMetrics", address_family,
kPeerConnectionAddressFamilyCounter_Max);
InitializeTransportController_n(configuration, dependencies);
});
config.ice_transport_factory = ice_transport_factory_.get();
config.on_dtls_handshake_error_ =
[weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) {
if (weak_ptr) {
weak_ptr->OnTransportControllerDtlsHandshakeError(s);
}
};
transport_controller_.reset(new JsepTransportController(
signaling_thread(), network_thread(), port_allocator_.get(),
async_resolver_factory_.get(), config));
// The following RTC_DCHECKs are added by looking at the caller thread.
// If this is incorrect there might not be test failures
// due to lack of unit tests which trigger these scenarios.
// TODO(bugs.webrtc.org/12160): Remove above comments.
// callbacks for signaling_thread.
// TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network
// Invoke(), then perhaps we could post these subscription calls to the
// network thread so that the transport controller doesn't have to do the
// signaling/network handling internally and use AsyncInvoker.
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
});
transport_controller_->SubscribeConnectionState(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
});
transport_controller_->SubscribeStandardizedIceConnectionState(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
});
transport_controller_->SubscribeIceGatheringState(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
});
transport_controller_->SubscribeIceCandidateGathered(
[this](const std::string& transport,
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(transport, candidates);
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
});
transport_controller_->SubscribeIceCandidatesRemoved(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
});
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
});
configuration_ = configuration;
transport_controller_->SetIceConfig(ParseIceConfig(configuration));
stats_ = std::make_unique<StatsCollector>(this);
stats_collector_ = RTCStatsCollector::Create(this);
@ -632,125 +716,6 @@ RTCError PeerConnection::Initialize(
return RTCError::OK();
}
void PeerConnection::InitializeTransportController_n(
const RTCConfiguration& configuration,
const PeerConnectionDependencies& dependencies) {
JsepTransportController::Config config;
config.redetermine_role_on_ice_restart =
configuration.redetermine_role_on_ice_restart;
config.ssl_max_version = options_.ssl_max_version;
config.disable_encryption = options_.disable_encryption;
config.bundle_policy = configuration.bundle_policy;
config.rtcp_mux_policy = configuration.rtcp_mux_policy;
// TODO(bugs.webrtc.org/9891) - Remove options_.crypto_options then remove
// this stub.
config.crypto_options = configuration.crypto_options.has_value()
? *configuration.crypto_options
: options_.crypto_options;
config.transport_observer = this;
config.rtcp_handler = InitializeRtcpCallback();
config.event_log = event_log_ptr_;
#if defined(ENABLE_EXTERNAL_AUTH)
config.enable_external_auth = true;
#endif
config.active_reset_srtp_params = configuration.active_reset_srtp_params;
// DTLS has to be enabled to use SCTP.
if (!configuration.enable_rtp_data_channel &&
!options_.disable_sctp_data_channels && dtls_enabled_) {
config.sctp_factory = context_->sctp_transport_factory();
}
config.ice_transport_factory = ice_transport_factory_.get();
config.on_dtls_handshake_error_ =
[weak_ptr = weak_factory_.GetWeakPtr()](rtc::SSLHandshakeError s) {
if (weak_ptr) {
weak_ptr->OnTransportControllerDtlsHandshakeError(s);
}
};
transport_controller_.reset(
new JsepTransportController(network_thread(), port_allocator_.get(),
async_resolver_factory_.get(), config));
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerConnectionState(s);
}));
});
transport_controller_->SubscribeConnectionState(
[this](PeerConnectionInterface::PeerConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetConnectionState(s);
}));
});
transport_controller_->SubscribeStandardizedIceConnectionState(
[this](PeerConnectionInterface::IceConnectionState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
SetStandardizedIceConnectionState(s);
}));
});
transport_controller_->SubscribeIceGatheringState(
[this](cricket::IceGatheringState s) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, s]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerGatheringState(s);
}));
});
transport_controller_->SubscribeIceCandidateGathered(
[this](const std::string& transport,
const std::vector<cricket::Candidate>& candidates) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(),
[this, t = transport, c = candidates]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesGathered(t, c);
}));
});
transport_controller_->SubscribeIceCandidateError(
[this](const cricket::IceCandidateErrorEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateError(event);
}));
});
transport_controller_->SubscribeIceCandidatesRemoved(
[this](const std::vector<cricket::Candidate>& c) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(
ToQueuedTask(signaling_thread_safety_.flag(), [this, c = c]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidatesRemoved(c);
}));
});
transport_controller_->SubscribeIceCandidatePairChanged(
[this](const cricket::CandidatePairChangeEvent& event) {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(ToQueuedTask(
signaling_thread_safety_.flag(), [this, event = event]() {
RTC_DCHECK_RUN_ON(signaling_thread());
OnTransportControllerCandidateChanged(event);
}));
});
transport_controller_->SetIceConfig(ParseIceConfig(configuration));
}
rtc::scoped_refptr<StreamCollectionInterface> PeerConnection::local_streams() {
RTC_DCHECK_RUN_ON(signaling_thread());
RTC_CHECK(!IsUnifiedPlan()) << "local_streams is not available with Unified "
@ -1475,7 +1440,6 @@ RTCError PeerConnection::SetConfiguration(
if (configuration_.active_reset_srtp_params !=
modified_config.active_reset_srtp_params) {
// TODO(tommi): move to the network thread - this hides an invoke.
transport_controller_->SetActiveResetSrtpParams(
modified_config.active_reset_srtp_params);
}
@ -1630,7 +1594,6 @@ void PeerConnection::StopRtcEventLog() {
rtc::scoped_refptr<DtlsTransportInterface>
PeerConnection::LookupDtlsTransportByMid(const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(tommi): Move to the network thread - this hides an invoke.
return transport_controller_->LookupDtlsTransportByMid(mid);
}
@ -1734,12 +1697,13 @@ void PeerConnection::Close() {
port_allocator_->DiscardCandidatePool();
if (network_thread_safety_) {
network_thread_safety_->SetNotAlive();
network_thread_safety_ = nullptr;
}
});
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
worker_thread_safety_->SetNotAlive();
call_safety_.reset();
call_.reset();
// The event log must outlive call (and any other object that uses it).
event_log_.reset();
@ -2180,10 +2144,7 @@ bool PeerConnection::IceRestartPending(const std::string& content_name) const {
}
bool PeerConnection::NeedsIceRestart(const std::string& content_name) const {
return network_thread()->Invoke<bool>(RTC_FROM_HERE, [this, &content_name] {
RTC_DCHECK_RUN_ON(network_thread());
return transport_controller_->NeedsIceRestart(content_name);
});
return transport_controller_->NeedsIceRestart(content_name);
}
void PeerConnection::OnTransportControllerConnectionState(
@ -2526,7 +2487,6 @@ void PeerConnection::OnTransportControllerGatheringState(
}
void PeerConnection::ReportTransportStats() {
rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
std::map<std::string, std::set<cricket::MediaType>>
media_types_by_transport_name;
for (const auto& transceiver : rtp_manager()->transceivers()->List()) {
@ -2548,25 +2508,18 @@ void PeerConnection::ReportTransportStats() {
cricket::MEDIA_TYPE_DATA);
}
// Run the loop that reports the state on the network thread since the
// transport controller requires the stats to be read there (GetStats()).
network_thread()->PostTask(ToQueuedTask(
network_thread_safety_, [this, media_types_by_transport_name = std::move(
media_types_by_transport_name)] {
for (const auto& entry : media_types_by_transport_name) {
const std::string& transport_name = entry.first;
const std::set<cricket::MediaType> media_types = entry.second;
cricket::TransportStats stats;
if (transport_controller_->GetStats(transport_name, &stats)) {
ReportBestConnectionState(stats);
ReportNegotiatedCiphers(dtls_enabled_, stats, media_types);
}
}
}));
for (const auto& entry : media_types_by_transport_name) {
const std::string& transport_name = entry.first;
const std::set<cricket::MediaType> media_types = entry.second;
cricket::TransportStats stats;
if (transport_controller_->GetStats(transport_name, &stats)) {
ReportBestConnectionState(stats);
ReportNegotiatedCiphers(stats, media_types);
}
}
}
// Walk through the ConnectionInfos to gather best connection usage
// for IPv4 and IPv6.
// static (no member state required)
void PeerConnection::ReportBestConnectionState(
const cricket::TransportStats& stats) {
for (const cricket::TransportChannelStats& channel_stats :
@ -2614,12 +2567,10 @@ void PeerConnection::ReportBestConnectionState(
}
}
// static
void PeerConnection::ReportNegotiatedCiphers(
bool dtls_enabled,
const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types) {
if (!dtls_enabled || stats.channel_stats.empty()) {
if (!dtls_enabled_ || stats.channel_stats.empty()) {
return;
}
@ -2770,9 +2721,24 @@ void PeerConnection::RequestUsagePatternReportForTesting() {
std::function<void(const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us)>
PeerConnection::InitializeRtcpCallback() {
RTC_DCHECK_RUN_ON(network_thread());
return [this, flag = worker_thread_safety_](
const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(signaling_thread());
auto flag =
worker_thread()->Invoke<rtc::scoped_refptr<PendingTaskSafetyFlag>>(
RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
if (!call_)
return rtc::scoped_refptr<PendingTaskSafetyFlag>();
if (!call_safety_)
call_safety_.reset(new ScopedTaskSafety());
return call_safety_->flag();
});
if (!flag)
return [](const rtc::CopyOnWriteBuffer&, int64_t) {};
return [this, flag = std::move(flag)](const rtc::CopyOnWriteBuffer& packet,
int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(network_thread());
// TODO(bugs.webrtc.org/11993): We should actually be delivering this call
// directly to the Call class somehow directly on the network thread and not

View File

@ -455,8 +455,7 @@ class PeerConnection : public PeerConnectionInternal,
bool is_unified_plan,
std::unique_ptr<RtcEventLog> event_log,
std::unique_ptr<Call> call,
PeerConnectionDependencies& dependencies,
bool dtls_enabled);
PeerConnectionDependencies& dependencies);
~PeerConnection() override;
@ -464,10 +463,6 @@ class PeerConnection : public PeerConnectionInternal,
RTCError Initialize(
const PeerConnectionInterface::RTCConfiguration& configuration,
PeerConnectionDependencies dependencies);
void InitializeTransportController_n(
const RTCConfiguration& configuration,
const PeerConnectionDependencies& dependencies)
RTC_RUN_ON(network_thread());
rtc::scoped_refptr<RtpTransceiverProxyWithInternal<RtpTransceiver>>
FindTransceiverBySender(rtc::scoped_refptr<RtpSenderInterface> sender)
@ -578,12 +573,11 @@ class PeerConnection : public PeerConnectionInternal,
void ReportTransportStats() RTC_RUN_ON(signaling_thread());
// Gather the usage of IPv4/IPv6 as best connection.
static void ReportBestConnectionState(const cricket::TransportStats& stats);
void ReportBestConnectionState(const cricket::TransportStats& stats);
static void ReportNegotiatedCiphers(
bool dtls_enabled,
const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types);
void ReportNegotiatedCiphers(const cricket::TransportStats& stats,
const std::set<cricket::MediaType>& media_types)
RTC_RUN_ON(signaling_thread());
void ReportIceCandidateCollected(const cricket::Candidate& candidate)
RTC_RUN_ON(signaling_thread());
@ -633,9 +627,8 @@ class PeerConnection : public PeerConnectionInternal,
// TODO(zstein): |async_resolver_factory_| can currently be nullptr if it
// is not injected. It should be required once chromium supplies it.
// This member variable is only used by JsepTransportController so we should
// consider moving ownership to there.
const std::unique_ptr<AsyncResolverFactory> async_resolver_factory_;
const std::unique_ptr<AsyncResolverFactory> async_resolver_factory_
RTC_GUARDED_BY(signaling_thread());
std::unique_ptr<cricket::PortAllocator>
port_allocator_; // TODO(bugs.webrtc.org/9987): Accessed on both
// signaling and network thread.
@ -653,7 +646,8 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
ScopedTaskSafety signaling_thread_safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> network_thread_safety_;
rtc::scoped_refptr<PendingTaskSafetyFlag> worker_thread_safety_;
std::unique_ptr<ScopedTaskSafety> call_safety_
RTC_GUARDED_BY(worker_thread());
// Points to the same thing as `call_`. Since it's const, we may read the
// pointer from any thread.
@ -687,7 +681,7 @@ class PeerConnection : public PeerConnectionInternal,
std::unique_ptr<SdpOfferAnswerHandler> sdp_handler_
RTC_GUARDED_BY(signaling_thread());
const bool dtls_enabled_;
bool dtls_enabled_ RTC_GUARDED_BY(signaling_thread()) = false;
UsagePattern usage_pattern_ RTC_GUARDED_BY(signaling_thread());
bool return_histogram_very_quickly_ RTC_GUARDED_BY(signaling_thread()) =

View File

@ -2794,7 +2794,7 @@ bool SdpOfferAnswerHandler::IceRestartPending(
bool SdpOfferAnswerHandler::NeedsIceRestart(
const std::string& content_name) const {
return pc_->NeedsIceRestart(content_name);
return transport_controller()->NeedsIceRestart(content_name);
}
absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GetDtlsRole(

View File

@ -97,7 +97,8 @@ ScenarioIceConnectionImpl::ScenarioIceConnectionImpl(
port_allocator_(
new cricket::BasicPortAllocator(manager_->network_manager())),
jsep_controller_(
new JsepTransportController(network_thread_,
new JsepTransportController(signaling_thread_,
network_thread_,
port_allocator_.get(),
/*async_resolver_factory*/ nullptr,
CreateJsepConfig())) {