diff --git a/webrtc/p2p/base/dtlstransportchannel_unittest.cc b/webrtc/p2p/base/dtlstransportchannel_unittest.cc index 95696e222c..e62563e9b5 100644 --- a/webrtc/p2p/base/dtlstransportchannel_unittest.cc +++ b/webrtc/p2p/base/dtlstransportchannel_unittest.cc @@ -75,8 +75,6 @@ class DtlsTestClient : public sigslot::has_slots<> { transport_->SetIceRole(role); transport_->SetIceTiebreaker( (role == cricket::ICEROLE_CONTROLLING) ? 1 : 2); - transport_->SignalWritableState.connect(this, - &DtlsTestClient::OnTransportWritableState); for (int i = 0; i < count; ++i) { cricket::DtlsTransportChannelWrapper* channel = @@ -193,7 +191,15 @@ class DtlsTestClient : public sigslot::has_slots<> { } bool all_channels_writable() const { - return transport_->all_channels_writable(); + if (channels_.empty()) { + return false; + } + for (cricket::DtlsTransportChannelWrapper* channel : channels_) { + if (!channel->writable()) { + return false; + } + } + return true; } void CheckRole(rtc::SSLRole role) { @@ -313,11 +319,6 @@ class DtlsTestClient : public sigslot::has_slots<> { return (num_matches < ((static_cast(size) - 5) / 10)); } - // Transport callbacks - void OnTransportWritableState(cricket::Transport* transport) { - LOG(LS_INFO) << name_ << ": is writable"; - } - // Transport channel callbacks void OnTransportChannelWritableState(cricket::TransportChannel* channel) { LOG(LS_INFO) << name_ << ": Channel '" << channel->component() diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc index 3e5f1b9f3a..66ba63e8a2 100644 --- a/webrtc/p2p/base/transport.cc +++ b/webrtc/p2p/base/transport.cc @@ -17,7 +17,7 @@ #include "webrtc/p2p/base/port.h" #include "webrtc/p2p/base/transportchannelimpl.h" #include "webrtc/base/bind.h" -#include "webrtc/base/common.h" +#include "webrtc/base/checks.h" #include "webrtc/base/logging.h" namespace cricket { @@ -69,59 +69,22 @@ Transport::Transport(const std::string& name, PortAllocator* allocator) : name_(name), allocator_(allocator) {} Transport::~Transport() { - ASSERT(channels_destroyed_); -} - -bool Transport::AllChannelsCompleted() const { - // We aren't completed until at least one channel is complete, so if there - // are no channels, we aren't complete yet. - if (channels_.empty()) { - LOG(LS_INFO) << name() << " transport is not complete" - << " because it has no TransportChannels"; - return false; - } - - // A Transport's ICE process is completed if all of its channels are writable, - // have finished allocating candidates, and have pruned all but one of their - // connections. - for (const auto& iter : channels_) { - const TransportChannelImpl* channel = iter.second.get(); - bool complete = - channel->writable() && - channel->GetState() == TransportChannelState::STATE_COMPLETED && - channel->GetIceRole() == ICEROLE_CONTROLLING && - channel->gathering_state() == kIceGatheringComplete; - if (!complete) { - LOG(LS_INFO) << name() << " transport is not complete" - << " because a channel is still incomplete."; - return false; - } - } - - return true; -} - -bool Transport::AnyChannelFailed() const { - for (const auto& iter : channels_) { - if (iter.second->GetState() == TransportChannelState::STATE_FAILED) { - return true; - } - } - return false; + RTC_DCHECK(channels_destroyed_); } void Transport::SetIceRole(IceRole role) { ice_role_ = role; - for (auto& iter : channels_) { - iter.second->SetIceRole(ice_role_); + for (const auto& kv : channels_) { + kv.second->SetIceRole(ice_role_); } } bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) { - if (channels_.empty()) + if (channels_.empty()) { return false; + } - ChannelMap::iterator iter = channels_.begin(); + auto iter = channels_.begin(); return iter->second->GetRemoteSSLCertificate(cert); } @@ -155,8 +118,8 @@ bool Transport::SetLocalTransportDescription( local_description_.reset(new TransportDescription(description)); - for (auto& iter : channels_) { - ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc); + for (const auto& kv : channels_) { + ret &= ApplyLocalTransportDescription(kv.second, error_desc); } if (!ret) { return false; @@ -186,8 +149,8 @@ bool Transport::SetRemoteTransportDescription( } remote_description_.reset(new TransportDescription(description)); - for (auto& iter : channels_) { - ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc); + for (const auto& kv : channels_) { + ret &= ApplyRemoteTransportDescription(kv.second, error_desc); } // If PRANSWER/ANSWER is set, we should decide transport protocol type. @@ -202,67 +165,48 @@ bool Transport::SetRemoteTransportDescription( } TransportChannelImpl* Transport::CreateChannel(int component) { - TransportChannelImpl* impl; + TransportChannelImpl* channel; // Create the entry if it does not exist. - bool impl_exists = false; - auto iterator = channels_.find(component); - if (iterator == channels_.end()) { - impl = CreateTransportChannel(component); - iterator = channels_.insert(std::pair( - component, ChannelMapEntry(impl))).first; + bool channel_exists = false; + auto iter = channels_.find(component); + if (iter == channels_.end()) { + channel = CreateTransportChannel(component); + channels_.insert(std::pair(component, channel)); } else { - impl = iterator->second.get(); - impl_exists = true; + channel = iter->second; + channel_exists = true; } - // Increase the ref count. - iterator->second.AddRef(); channels_destroyed_ = false; - if (impl_exists) { - // If this is an existing channel, we should just return it without - // connecting to all the signal again. - return impl; + if (channel_exists) { + // If this is an existing channel, we should just return it. + return channel; } // Push down our transport state to the new channel. - impl->SetIceRole(ice_role_); - impl->SetIceTiebreaker(tiebreaker_); - impl->SetIceConfig(ice_config_); + channel->SetIceRole(ice_role_); + channel->SetIceTiebreaker(tiebreaker_); + channel->SetIceConfig(ice_config_); // TODO(ronghuawu): Change CreateChannel to be able to return error since // below Apply**Description calls can fail. if (local_description_) - ApplyLocalTransportDescription(impl, NULL); + ApplyLocalTransportDescription(channel, nullptr); if (remote_description_) - ApplyRemoteTransportDescription(impl, NULL); + ApplyRemoteTransportDescription(channel, nullptr); if (local_description_ && remote_description_) - ApplyNegotiatedTransportDescription(impl, NULL); - - impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState); - impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState); - impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState); - impl->SignalCandidateGathered.connect(this, - &Transport::OnChannelCandidateGathered); - impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange); - impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict); - impl->SignalConnectionRemoved.connect( - this, &Transport::OnChannelConnectionRemoved); + ApplyNegotiatedTransportDescription(channel, nullptr); if (connect_requested_) { - impl->Connect(); - if (channels_.size() == 1) { - // If this is the first channel, then indicate that we have started - // connecting. - SignalConnecting(this); - } + channel->Connect(); } - return impl; + return channel; } TransportChannelImpl* Transport::GetChannel(int component) { - ChannelMap::iterator iter = channels_.find(component); - return (iter != channels_.end()) ? iter->second.get() : NULL; + auto iter = channels_.find(component); + return (iter != channels_.end()) ? iter->second : nullptr; } bool Transport::HasChannels() { @@ -270,32 +214,13 @@ bool Transport::HasChannels() { } void Transport::DestroyChannel(int component) { - ChannelMap::iterator iter = channels_.find(component); + auto iter = channels_.find(component); if (iter == channels_.end()) return; - TransportChannelImpl* impl = NULL; - - iter->second.DecRef(); - if (!iter->second.ref()) { - impl = iter->second.get(); - channels_.erase(iter); - } - - if (connect_requested_ && channels_.empty()) { - // We're no longer attempting to connect. - SignalConnecting(this); - } - - if (impl) { - DestroyTransportChannel(impl); - // Need to update aggregate state after destroying a channel, - // for example if it was the only one that wasn't yet writable. - UpdateWritableState(); - UpdateReceivingState(); - UpdateGatheringState(); - MaybeSignalCompleted(); - } + TransportChannelImpl* channel = iter->second; + channels_.erase(iter); + DestroyTransportChannel(channel); } void Transport::ConnectChannels() { @@ -316,14 +241,11 @@ void Transport::ConnectChannels() { TransportDescription desc( std::vector(), rtc::CreateRandomString(ICE_UFRAG_LENGTH), rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL, - CONNECTIONROLE_NONE, NULL, Candidates()); - SetLocalTransportDescription(desc, CA_OFFER, NULL); + CONNECTIONROLE_NONE, nullptr, Candidates()); + SetLocalTransportDescription(desc, CA_OFFER, nullptr); } CallChannels(&TransportChannelImpl::Connect); - if (HasChannels()) { - SignalConnecting(this); - } } void Transport::MaybeStartGathering() { @@ -333,24 +255,16 @@ void Transport::MaybeStartGathering() { } void Transport::DestroyAllChannels() { - std::vector impls; - for (auto& iter : channels_) { - iter.second.DecRef(); - if (!iter.second.ref()) - impls.push_back(iter.second.get()); + for (const auto& kv : channels_) { + DestroyTransportChannel(kv.second); } - channels_.clear(); - - for (TransportChannelImpl* impl : impls) { - DestroyTransportChannel(impl); - } channels_destroyed_ = true; } void Transport::CallChannels(TransportChannelFunc func) { - for (const auto& iter : channels_) { - ((iter.second.get())->*func)(); + for (const auto& kv : channels_) { + (kv.second->*func)(); } } @@ -389,13 +303,13 @@ bool Transport::VerifyCandidate(const Candidate& cand, std::string* error) { bool Transport::GetStats(TransportStats* stats) { stats->transport_name = name(); stats->channel_stats.clear(); - for (auto iter : channels_) { - ChannelMapEntry& entry = iter.second; + for (auto kv : channels_) { + TransportChannelImpl* channel = kv.second; TransportChannelStats substats; - substats.component = entry->component(); - entry->GetSrtpCipher(&substats.srtp_cipher); - entry->GetSslCipher(&substats.ssl_cipher); - if (!entry->GetStats(&substats.connection_infos)) { + substats.component = channel->component(); + channel->GetSrtpCipher(&substats.srtp_cipher); + channel->GetSslCipher(&substats.ssl_cipher); + if (!channel->GetStats(&substats.connection_infos)) { return false; } stats->channel_stats.push_back(substats); @@ -418,170 +332,15 @@ bool Transport::AddRemoteCandidates(const std::vector& candidates, } } - for (std::vector::const_iterator iter = candidates.begin(); - iter != candidates.end(); - ++iter) { - TransportChannelImpl* channel = GetChannel(iter->component()); - if (channel != NULL) { - channel->AddRemoteCandidate(*iter); + for (const Candidate& candidate : candidates) { + TransportChannelImpl* channel = GetChannel(candidate.component()); + if (channel != nullptr) { + channel->AddRemoteCandidate(candidate); } } return true; } -void Transport::OnChannelWritableState(TransportChannel* channel) { - LOG(LS_INFO) << name() << " TransportChannel " << channel->component() - << " writability changed to " << channel->writable() - << ". Check if transport is complete."; - UpdateWritableState(); - MaybeSignalCompleted(); -} - -void Transport::OnChannelReceivingState(TransportChannel* channel) { - UpdateReceivingState(); -} - -TransportState Transport::GetTransportState(TransportStateType state_type) { - bool any = false; - bool all = !channels_.empty(); - for (const auto iter : channels_) { - bool b = false; - switch (state_type) { - case TRANSPORT_WRITABLE_STATE: - b = iter.second->writable(); - break; - case TRANSPORT_RECEIVING_STATE: - b = iter.second->receiving(); - break; - default: - ASSERT(false); - } - any |= b; - all &= b; - } - - if (all) { - return TRANSPORT_STATE_ALL; - } else if (any) { - return TRANSPORT_STATE_SOME; - } - - return TRANSPORT_STATE_NONE; -} - -void Transport::OnChannelGatheringState(TransportChannelImpl* channel) { - ASSERT(channels_.find(channel->component()) != channels_.end()); - UpdateGatheringState(); - if (gathering_state_ == kIceGatheringComplete) { - // If UpdateGatheringState brought us to kIceGatheringComplete, check if - // our connection state is also "Completed". Otherwise, there's no point in - // checking (since it would only produce log messages). - MaybeSignalCompleted(); - } -} - -void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel, - const Candidate& candidate) { - // We should never signal peer-reflexive candidates. - if (candidate.type() == PRFLX_PORT_TYPE) { - ASSERT(false); - return; - } - - ASSERT(connect_requested_); - std::vector candidates; - candidates.push_back(candidate); - SignalCandidatesGathered(this, candidates); -} - -void Transport::OnChannelRouteChange(TransportChannel* channel, - const Candidate& remote_candidate) { - SignalRouteChange(this, remote_candidate.component(), remote_candidate); -} - -void Transport::OnRoleConflict(TransportChannelImpl* channel) { - SignalRoleConflict(); -} - -void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) { - LOG(LS_INFO) << name() << " TransportChannel " << channel->component() - << " connection removed. Check if transport is complete."; - MaybeSignalCompleted(); - - // Check if the state is now Failed. - // Failed is only available in the Controlling ICE role. - if (channel->GetIceRole() != ICEROLE_CONTROLLING) { - return; - } - - // Failed can only occur after candidate gathering has stopped. - if (channel->gathering_state() != kIceGatheringComplete) { - return; - } - - if (channel->GetState() == TransportChannelState::STATE_FAILED) { - // A Transport has failed if any of its channels have no remaining - // connections. - SignalFailed(this); - } -} - -void Transport::MaybeSignalCompleted() { - if (AllChannelsCompleted()) { - LOG(LS_INFO) << name() << " transport is complete" - << " because all the channels are complete."; - SignalCompleted(this); - } - // TODO(deadbeef): Should we do anything if we previously were completed, - // but now are not (if, for example, a new remote candidate is added)? -} - -void Transport::UpdateGatheringState() { - IceGatheringState new_state = kIceGatheringNew; - bool any_gathering = false; - bool all_complete = !channels_.empty(); - for (const auto& kv : channels_) { - any_gathering = - any_gathering || kv.second->gathering_state() != kIceGatheringNew; - all_complete = - all_complete && kv.second->gathering_state() == kIceGatheringComplete; - } - if (all_complete) { - new_state = kIceGatheringComplete; - } else if (any_gathering) { - new_state = kIceGatheringGathering; - } - - if (gathering_state_ != new_state) { - gathering_state_ = new_state; - if (gathering_state_ == kIceGatheringGathering) { - LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates"; - } else if (gathering_state_ == kIceGatheringComplete) { - LOG(LS_INFO) << "Transport " << name() << " gathering complete."; - } - SignalGatheringState(this); - } -} - -void Transport::UpdateReceivingState() { - TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE); - if (receiving_ != receiving) { - receiving_ = receiving; - SignalReceivingState(this); - } -} - -void Transport::UpdateWritableState() { - TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE); - LOG(LS_INFO) << name() << " transport writable state changed? " << writable_ - << " => " << writable; - if (writable_ != writable) { - was_writable_ = (writable_ == TRANSPORT_STATE_ALL); - writable_ = writable; - SignalWritableState(this); - } -} - bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch, std::string* error_desc) { ch->SetIceCredentials(local_description_->ice_ufrag, @@ -623,9 +382,10 @@ bool Transport::NegotiateTransportDescription(ContentAction local_role, // between future SetRemote/SetLocal invocations and new channel // creation, we have the negotiation state saved until a new // negotiation happens. - for (auto& iter : channels_) { - if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc)) + for (const auto& kv : channels_) { + if (!ApplyNegotiatedTransportDescription(kv.second, error_desc)) { return false; + } } return true; } diff --git a/webrtc/p2p/base/transport.h b/webrtc/p2p/base/transport.h index 10b289ce20..a3daa39fbd 100644 --- a/webrtc/p2p/base/transport.h +++ b/webrtc/p2p/base/transport.h @@ -65,21 +65,6 @@ enum IceGatheringState { kIceGatheringComplete, }; -// For "writable" and "receiving", we need to differentiate between -// none, all, and some. -enum TransportState { - TRANSPORT_STATE_NONE = 0, - TRANSPORT_STATE_SOME, - TRANSPORT_STATE_ALL -}; - -// When checking transport state, we need to differentiate between -// "writable" or "receiving" check. -enum TransportStateType { - TRANSPORT_WRITABLE_STATE = 0, - TRANSPORT_RECEIVING_STATE -}; - // Stats that we can return about the connections for a transport channel. // TODO(hta): Rename to ConnectionStats struct ConnectionInfo { @@ -165,37 +150,10 @@ class Transport : public sigslot::has_slots<> { // Returns the port allocator object for this transport. PortAllocator* port_allocator() { return allocator_; } - // Returns the states of this manager. These bits are the ORs - // of the corresponding bits on the managed channels. Each time one of these - // states changes, a signal is raised. - // TODO(honghaiz): Replace uses of writable() with any_channels_writable(). - bool writable() const { return any_channels_writable(); } - bool was_writable() const { return was_writable_; } - bool any_channels_writable() const { - return (writable_ == TRANSPORT_STATE_SOME || - writable_ == TRANSPORT_STATE_ALL); - } - bool all_channels_writable() const { - return (writable_ == TRANSPORT_STATE_ALL); - } - bool any_channel_receiving() const { - return (receiving_ == TRANSPORT_STATE_SOME || - receiving_ == TRANSPORT_STATE_ALL); - } bool ready_for_remote_candidates() const { return local_description_set_ && remote_description_set_; } - bool AllChannelsCompleted() const; - bool AnyChannelFailed() const; - - IceGatheringState gathering_state() const { return gathering_state_; } - - sigslot::signal1 SignalWritableState; - sigslot::signal1 SignalReceivingState; - sigslot::signal1 SignalCompleted; - sigslot::signal1 SignalFailed; - // Returns whether the client has requested the channels to connect. bool connect_requested() const { return connect_requested_; } @@ -229,6 +187,7 @@ class Transport : public sigslot::has_slots<> { return (NULL != GetChannel(component)); } bool HasChannels(); + void DestroyChannel(int component); // Set the local TransportDescription to be used by TransportChannels. @@ -241,10 +200,8 @@ class Transport : public sigslot::has_slots<> { ContentAction action, std::string* error_desc); - // Tells all current and future channels to start connecting. When the first - // channel begins connecting, the following signal is raised. + // Tells all current and future channels to start connecting. void ConnectChannels(); - sigslot::signal1 SignalConnecting; // Tells channels to start gathering candidates if necessary. // Should be called after ConnectChannels() has been called at least once, @@ -260,12 +217,6 @@ class Transport : public sigslot::has_slots<> { bool GetStats(TransportStats* stats); - sigslot::signal1 SignalGatheringState; - - // Handles sending of ready candidates and receiving of remote candidates. - sigslot::signal2&> - SignalCandidatesGathered; - // Called when one or more candidates are ready from the remote peer. bool AddRemoteCandidates(const std::vector& candidates, std::string* error); @@ -275,14 +226,6 @@ class Transport : public sigslot::has_slots<> { virtual bool VerifyCandidate(const Candidate& candidate, std::string* error); - // Signals when the best connection for a channel changes. - sigslot::signal3 SignalRouteChange; - - // Forwards the signal from TransportChannel to BaseSession. - sigslot::signal0<> SignalRoleConflict; - virtual bool GetSslRole(rtc::SSLRole* ssl_role) const { return false; } // Must be called before channel is starting to connect. @@ -335,74 +278,16 @@ class Transport : public sigslot::has_slots<> { std::string* error_desc); private: - struct ChannelMapEntry { - ChannelMapEntry() : impl_(NULL), ref_(0) {} - explicit ChannelMapEntry(TransportChannelImpl *impl) - : impl_(impl), - ref_(0) { - } - - void AddRef() { ++ref_; } - void DecRef() { - ASSERT(ref_ > 0); - --ref_; - } - int ref() const { return ref_; } - - TransportChannelImpl* get() const { return impl_; } - TransportChannelImpl* operator->() const { return impl_; } - - private: - TransportChannelImpl* impl_; - int ref_; - }; - - // Candidate component => ChannelMapEntry - typedef std::map ChannelMap; - - // Called when the write state of a channel changes. - void OnChannelWritableState(TransportChannel* channel); - - // Called when the receiving state of a channel changes. - void OnChannelReceivingState(TransportChannel* channel); - - // Called when a channel starts finishes gathering candidates - void OnChannelGatheringState(TransportChannelImpl* channel); - - // Called when a candidate is ready from channel. - void OnChannelCandidateGathered(TransportChannelImpl* channel, - const Candidate& candidate); - void OnChannelRouteChange(TransportChannel* channel, - const Candidate& remote_candidate); - // Called when there is ICE role change. - void OnRoleConflict(TransportChannelImpl* channel); - // Called when the channel removes a connection. - void OnChannelConnectionRemoved(TransportChannelImpl* channel); + // Candidate component => TransportChannelImpl* + typedef std::map ChannelMap; // Helper function that invokes the given function on every channel. typedef void (TransportChannelImpl::* TransportChannelFunc)(); void CallChannels(TransportChannelFunc func); - // Computes the AND and OR of the channel's read/write/receiving state - // (argument picks the operation). - TransportState GetTransportState(TransportStateType type); - - // Sends SignalCompleted if we are now in that state. - void MaybeSignalCompleted(); - - // Sends SignalGatheringState if gathering state changed - void UpdateGatheringState(); - - void UpdateWritableState(); - void UpdateReceivingState(); - const std::string name_; PortAllocator* const allocator_; bool channels_destroyed_ = false; - TransportState readable_ = TRANSPORT_STATE_NONE; - TransportState writable_ = TRANSPORT_STATE_NONE; - TransportState receiving_ = TRANSPORT_STATE_NONE; - bool was_writable_ = false; bool connect_requested_ = false; IceRole ice_role_ = ICEROLE_UNKNOWN; uint64 tiebreaker_ = 0; @@ -412,7 +297,6 @@ class Transport : public sigslot::has_slots<> { rtc::scoped_ptr remote_description_; bool local_description_set_ = false; bool remote_description_set_ = false; - IceGatheringState gathering_state_ = kIceGatheringNew; ChannelMap channels_; diff --git a/webrtc/p2p/base/transport_unittest.cc b/webrtc/p2p/base/transport_unittest.cc index 9febfe33f7..1f66a47c99 100644 --- a/webrtc/p2p/base/transport_unittest.cc +++ b/webrtc/p2p/base/transport_unittest.cc @@ -34,15 +34,7 @@ class TransportTest : public testing::Test, public sigslot::has_slots<> { public: TransportTest() - : transport_(new FakeTransport("test content name")), - channel_(NULL), - connecting_signalled_(false), - completed_(false), - failed_(false) { - transport_->SignalConnecting.connect(this, &TransportTest::OnConnecting); - transport_->SignalCompleted.connect(this, &TransportTest::OnCompleted); - transport_->SignalFailed.connect(this, &TransportTest::OnFailed); - } + : transport_(new FakeTransport("test content name")), channel_(NULL) {} ~TransportTest() { transport_->DestroyAllChannels(); } @@ -60,30 +52,10 @@ class TransportTest : public testing::Test, } protected: - void OnConnecting(Transport* transport) { - connecting_signalled_ = true; - } - void OnCompleted(Transport* transport) { - completed_ = true; - } - void OnFailed(Transport* transport) { - failed_ = true; - } - rtc::scoped_ptr transport_; FakeTransportChannel* channel_; - bool connecting_signalled_; - bool completed_; - bool failed_; }; -// Test that calling ConnectChannels triggers an OnConnecting signal. -TEST_F(TransportTest, TestConnectChannelsDoesSignal) { - EXPECT_TRUE(SetupChannel()); - transport_->ConnectChannels(); - EXPECT_TRUE(connecting_signalled_); -} - // This test verifies channels are created with proper ICE // role, tiebreaker and remote ice mode and credentials after offer and // answer negotiations. @@ -200,41 +172,6 @@ TEST_F(TransportTest, TestIceControllingOnIceRestartIfRemoteIsIceLite) { EXPECT_EQ(cricket::ICEROLE_CONTROLLING, channel_->GetIceRole()); } -// This test verifies that the Completed and Failed states can be reached. -TEST_F(TransportTest, TestChannelCompletedAndFailed) { - transport_->SetIceRole(cricket::ICEROLE_CONTROLLING); - cricket::TransportDescription local_desc(kIceUfrag1, kIcePwd1); - ASSERT_TRUE(transport_->SetLocalTransportDescription(local_desc, - cricket::CA_OFFER, - NULL)); - EXPECT_TRUE(SetupChannel()); - - cricket::TransportDescription remote_desc(kIceUfrag1, kIcePwd1); - ASSERT_TRUE(transport_->SetRemoteTransportDescription(remote_desc, - cricket::CA_ANSWER, - NULL)); - - channel_->SetConnectionCount(2); - channel_->SetCandidatesGatheringComplete(); - channel_->SetWritable(true); - EXPECT_TRUE_WAIT(transport_->all_channels_writable(), 100); - // ICE is not yet completed because there is still more than one connection. - EXPECT_FALSE(completed_); - EXPECT_FALSE(failed_); - - // When the connection count drops to 1, SignalCompleted should be emitted, - // and completed() should be true. - channel_->SetConnectionCount(1); - EXPECT_TRUE_WAIT(completed_, 100); - completed_ = false; - - // When the connection count drops to 0, SignalFailed should be emitted, and - // completed() should be false. - channel_->SetConnectionCount(0); - EXPECT_TRUE_WAIT(failed_, 100); - EXPECT_FALSE(completed_); -} - // Tests channel role is reversed after receiving ice-lite from remote. TEST_F(TransportTest, TestSetRemoteIceLiteInOffer) { transport_->SetIceRole(cricket::ICEROLE_CONTROLLED); @@ -293,23 +230,3 @@ TEST_F(TransportTest, TestGetStats) { EXPECT_EQ(1, stats.channel_stats[0].component); } -TEST_F(TransportTest, TestReceivingStateChange) { - ASSERT_TRUE(SetupChannel()); - channel_->SetConnectionCount(1); - transport_->ConnectChannels(); - EXPECT_FALSE(transport_->any_channel_receiving()); - - channel_->SetReceiving(true); - EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100); - FakeTransportChannel* channel2 = CreateChannel(2); - channel2->SetReceiving(true); - EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100); - - channel2->SetReceiving(false); - EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100); - - // After both channels become not receiving, the transport receiving state - // becomes TRANSPORT_STATE_NONE. - channel_->SetReceiving(false); - EXPECT_TRUE_WAIT(!transport_->any_channel_receiving(), 100); -} diff --git a/webrtc/p2p/base/transportcontroller.cc b/webrtc/p2p/base/transportcontroller.cc index d84d574f15..22b827a1a5 100644 --- a/webrtc/p2p/base/transportcontroller.cc +++ b/webrtc/p2p/base/transportcontroller.cc @@ -10,11 +10,14 @@ #include "webrtc/p2p/base/transportcontroller.h" +#include + #include "webrtc/base/bind.h" #include "webrtc/base/checks.h" #include "webrtc/base/thread.h" #include "webrtc/p2p/base/dtlstransport.h" #include "webrtc/p2p/base/p2ptransport.h" +#include "webrtc/p2p/base/port.h" namespace cricket { @@ -140,8 +143,32 @@ TransportChannel* TransportController::CreateTransportChannel_w( int component) { RTC_DCHECK(worker_thread_->IsCurrent()); + auto it = FindChannel_w(transport_name, component); + if (it != channels_.end()) { + // Channel already exists; increment reference count and return. + it->AddRef(); + return it->get(); + } + + // Need to create a new channel. Transport* transport = GetOrCreateTransport_w(transport_name); - return transport->CreateChannel(component); + TransportChannelImpl* channel = transport->CreateChannel(component); + channel->SignalWritableState.connect( + this, &TransportController::OnChannelWritableState_w); + channel->SignalReceivingState.connect( + this, &TransportController::OnChannelReceivingState_w); + channel->SignalGatheringState.connect( + this, &TransportController::OnChannelGatheringState_w); + channel->SignalCandidateGathered.connect( + this, &TransportController::OnChannelCandidateGathered_w); + channel->SignalRoleConflict.connect( + this, &TransportController::OnChannelRoleConflict_w); + channel->SignalConnectionRemoved.connect( + this, &TransportController::OnChannelConnectionRemoved_w); + channels_.insert(channels_.end(), RefCountedChannel(channel))->AddRef(); + // Adding a channel could cause aggregate state to change. + UpdateAggregateStates_w(); + return channel; } void TransportController::DestroyTransportChannel_w( @@ -149,18 +176,29 @@ void TransportController::DestroyTransportChannel_w( int component) { RTC_DCHECK(worker_thread_->IsCurrent()); - Transport* transport = GetTransport_w(transport_name); - if (!transport) { - ASSERT(false); + auto it = FindChannel_w(transport_name, component); + if (it == channels_.end()) { + LOG(LS_WARNING) << "Attempting to delete " << transport_name + << " TransportChannel " << component + << ", which doesn't exist."; return; } - transport->DestroyChannel(component); + it->DecRef(); + if (it->ref() > 0) { + return; + } + + channels_.erase(it); + Transport* transport = GetTransport_w(transport_name); + transport->DestroyChannel(component); // Just as we create a Transport when its first channel is created, // we delete it when its last channel is deleted. if (!transport->HasChannels()) { DestroyTransport_w(transport_name); } + // Removing a channel could cause aggregate state to change. + UpdateAggregateStates_w(); } const rtc::scoped_refptr& @@ -221,6 +259,17 @@ void TransportController::OnMessage(rtc::Message* pmsg) { } } +std::vector::iterator +TransportController::FindChannel_w(const std::string& transport_name, + int component) { + return std::find_if( + channels_.begin(), channels_.end(), + [transport_name, component](const RefCountedChannel& channel) { + return channel->transport_name() == transport_name && + channel->component() == component; + }); +} + Transport* TransportController::GetOrCreateTransport_w( const std::string& transport_name) { RTC_DCHECK(worker_thread_->IsCurrent()); @@ -240,22 +289,6 @@ Transport* TransportController::GetOrCreateTransport_w( if (certificate_) { transport->SetLocalCertificate(certificate_); } - transport->SignalConnecting.connect( - this, &TransportController::OnTransportConnecting_w); - transport->SignalWritableState.connect( - this, &TransportController::OnTransportWritableState_w); - transport->SignalReceivingState.connect( - this, &TransportController::OnTransportReceivingState_w); - transport->SignalCompleted.connect( - this, &TransportController::OnTransportCompleted_w); - transport->SignalFailed.connect(this, - &TransportController::OnTransportFailed_w); - transport->SignalGatheringState.connect( - this, &TransportController::OnTransportGatheringState_w); - transport->SignalCandidatesGathered.connect( - this, &TransportController::OnTransportCandidatesGathered_w); - transport->SignalRoleConflict.connect( - this, &TransportController::OnTransportRoleConflict_w); transports_[transport_name] = transport; return transport; @@ -270,8 +303,6 @@ void TransportController::DestroyTransport_w( delete iter->second; transports_.erase(transport_name); } - // Destroying a transport may cause aggregate state to change. - UpdateAggregateStates_w(); } void TransportController::DestroyAllTransports_w() { @@ -447,49 +478,49 @@ bool TransportController::GetStats_w(const std::string& transport_name, return transport->GetStats(stats); } -void TransportController::OnTransportConnecting_w(Transport* transport) { +void TransportController::OnChannelWritableState_w(TransportChannel* channel) { + RTC_DCHECK(worker_thread_->IsCurrent()); + LOG(LS_INFO) << channel->transport_name() << " TransportChannel " + << channel->component() << " writability changed to " + << channel->writable() << "."; + UpdateAggregateStates_w(); +} + +void TransportController::OnChannelReceivingState_w(TransportChannel* channel) { RTC_DCHECK(worker_thread_->IsCurrent()); UpdateAggregateStates_w(); } -void TransportController::OnTransportWritableState_w(Transport* transport) { +void TransportController::OnChannelGatheringState_w( + TransportChannelImpl* channel) { RTC_DCHECK(worker_thread_->IsCurrent()); UpdateAggregateStates_w(); } -void TransportController::OnTransportReceivingState_w(Transport* transport) { +void TransportController::OnChannelCandidateGathered_w( + TransportChannelImpl* channel, + const Candidate& candidate) { RTC_DCHECK(worker_thread_->IsCurrent()); - UpdateAggregateStates_w(); -} -void TransportController::OnTransportCompleted_w(Transport* transport) { - RTC_DCHECK(worker_thread_->IsCurrent()); - UpdateAggregateStates_w(); -} - -void TransportController::OnTransportFailed_w(Transport* transport) { - RTC_DCHECK(worker_thread_->IsCurrent()); - UpdateAggregateStates_w(); -} - -void TransportController::OnTransportGatheringState_w(Transport* transport) { - RTC_DCHECK(worker_thread_->IsCurrent()); - UpdateAggregateStates_w(); -} - -void TransportController::OnTransportCandidatesGathered_w( - Transport* transport, - const std::vector& candidates) { - RTC_DCHECK(worker_thread_->IsCurrent()); - CandidatesData* data = new CandidatesData(transport->name(), candidates); + // We should never signal peer-reflexive candidates. + if (candidate.type() == PRFLX_PORT_TYPE) { + RTC_DCHECK(false); + return; + } + std::vector candidates; + candidates.push_back(candidate); + CandidatesData* data = + new CandidatesData(channel->transport_name(), candidates); signaling_thread_->Post(this, MSG_CANDIDATESGATHERED, data); } -void TransportController::OnTransportRoleConflict_w() { +void TransportController::OnChannelRoleConflict_w( + TransportChannelImpl* channel) { RTC_DCHECK(worker_thread_->IsCurrent()); if (ice_role_switch_) { - LOG(LS_WARNING) << "Repeat of role conflict signal from Transport."; + LOG(LS_WARNING) + << "Repeat of role conflict signal from TransportChannelImpl."; return; } @@ -502,6 +533,15 @@ void TransportController::OnTransportRoleConflict_w() { } } +void TransportController::OnChannelConnectionRemoved_w( + TransportChannelImpl* channel) { + RTC_DCHECK(worker_thread_->IsCurrent()); + LOG(LS_INFO) << channel->transport_name() << " TransportChannel " + << channel->component() + << " connection removed. Check if state is complete."; + UpdateAggregateStates_w(); +} + void TransportController::UpdateAggregateStates_w() { RTC_DCHECK(worker_thread_->IsCurrent()); @@ -509,24 +549,24 @@ void TransportController::UpdateAggregateStates_w() { IceGatheringState new_gathering_state = kIceGatheringNew; bool any_receiving = false; bool any_failed = false; - bool all_connected = HasChannels_w(); - bool all_completed = HasChannels_w(); + bool all_connected = !channels_.empty(); + bool all_completed = !channels_.empty(); bool any_gathering = false; - bool all_done_gathering = HasChannels_w(); - for (const auto& kv : transports_) { - // Ignore transports without channels since they're about to be deleted, - // and their state is meaningless. - if (!kv.second->HasChannels()) { - continue; - } - any_receiving = any_receiving || kv.second->any_channel_receiving(); - any_failed = any_failed || kv.second->AnyChannelFailed(); - all_connected = all_connected && kv.second->all_channels_writable(); - all_completed = all_completed && kv.second->AllChannelsCompleted(); + bool all_done_gathering = !channels_.empty(); + for (const auto& channel : channels_) { + any_receiving = any_receiving || channel->receiving(); + any_failed = any_failed || + channel->GetState() == TransportChannelState::STATE_FAILED; + all_connected = all_connected && channel->writable(); + all_completed = + all_completed && channel->writable() && + channel->GetState() == TransportChannelState::STATE_COMPLETED && + channel->GetIceRole() == ICEROLE_CONTROLLING && + channel->gathering_state() == kIceGatheringComplete; any_gathering = - any_gathering || kv.second->gathering_state() != kIceGatheringNew; + any_gathering || channel->gathering_state() != kIceGatheringNew; all_done_gathering = all_done_gathering && - kv.second->gathering_state() == kIceGatheringComplete; + channel->gathering_state() == kIceGatheringComplete; } if (any_failed) { @@ -562,13 +602,4 @@ void TransportController::UpdateAggregateStates_w() { } } -bool TransportController::HasChannels_w() { - for (const auto& kv : transports_) { - if (kv.second->HasChannels()) { - return true; - } - } - return false; -} - } // namespace cricket diff --git a/webrtc/p2p/base/transportcontroller.h b/webrtc/p2p/base/transportcontroller.h index f506e01bcf..45fcfeac8a 100644 --- a/webrtc/p2p/base/transportcontroller.h +++ b/webrtc/p2p/base/transportcontroller.h @@ -81,9 +81,14 @@ class TransportController : public sigslot::has_slots<>, bool ReadyForRemoteCandidates(const std::string& transport_name); bool GetStats(const std::string& transport_name, TransportStats* stats); + // Creates a channel if it doesn't exist. Otherwise, increments a reference + // count and returns an existing channel. virtual TransportChannel* CreateTransportChannel_w( const std::string& transport_name, int component); + + // Decrements a channel's reference count, and destroys the channel if + // nothing is referencing it. virtual void DestroyTransportChannel_w(const std::string& transport_name, int component); @@ -121,6 +126,33 @@ class TransportController : public sigslot::has_slots<>, private: void OnMessage(rtc::Message* pmsg) override; + // It's the Transport that's currently responsible for creating/destroying + // channels, but the TransportController keeps track of how many external + // objects (BaseChannels) reference each channel. + struct RefCountedChannel { + RefCountedChannel() : impl_(nullptr), ref_(0) {} + explicit RefCountedChannel(TransportChannelImpl* impl) + : impl_(impl), ref_(0) {} + + void AddRef() { ++ref_; } + void DecRef() { + ASSERT(ref_ > 0); + --ref_; + } + int ref() const { return ref_; } + + TransportChannelImpl* get() const { return impl_; } + TransportChannelImpl* operator->() const { return impl_; } + + private: + TransportChannelImpl* impl_; + int ref_; + }; + + std::vector::iterator FindChannel_w( + const std::string& transport_name, + int component); + Transport* GetOrCreateTransport_w(const std::string& transport_name); void DestroyTransport_w(const std::string& transport_name); void DestroyAllTransports_w(); @@ -152,29 +184,27 @@ class TransportController : public sigslot::has_slots<>, bool GetStats_w(const std::string& transport_name, TransportStats* stats); // Handlers for signals from Transport. - void OnTransportConnecting_w(Transport* transport); - void OnTransportWritableState_w(Transport* transport); - void OnTransportReceivingState_w(Transport* transport); - void OnTransportCompleted_w(Transport* transport); - void OnTransportFailed_w(Transport* transport); - void OnTransportGatheringState_w(Transport* transport); - void OnTransportCandidatesGathered_w( - Transport* transport, - const std::vector& candidates); - void OnTransportRoleConflict_w(); + void OnChannelWritableState_w(TransportChannel* channel); + void OnChannelReceivingState_w(TransportChannel* channel); + void OnChannelGatheringState_w(TransportChannelImpl* channel); + void OnChannelCandidateGathered_w(TransportChannelImpl* channel, + const Candidate& candidate); + void OnChannelRoleConflict_w(TransportChannelImpl* channel); + void OnChannelConnectionRemoved_w(TransportChannelImpl* channel); void UpdateAggregateStates_w(); - bool HasChannels_w(); rtc::Thread* const signaling_thread_ = nullptr; rtc::Thread* const worker_thread_ = nullptr; typedef std::map TransportMap; TransportMap transports_; + std::vector channels_; + PortAllocator* const port_allocator_ = nullptr; rtc::SSLProtocolVersion ssl_max_version_ = rtc::SSL_PROTOCOL_DTLS_10; - // Aggregate state for Transports + // Aggregate state for TransportChannelImpls. IceConnectionState connection_state_ = kIceConnectionConnecting; bool receiving_ = false; IceGatheringState gathering_state_ = kIceGatheringNew;