Fixes reference counting problem when a TransportProxy points to a Transport prior to creating channels.

Until the TransportProxy enters the "negotiated" state we only create
ChannelImpls but we don't hook up to them. However, we still neeed to
reserve their spot and increment the reference count.

Once we are negotiated we can hook all the ChannelProxy's to the
corresponding ChannelImpls.

This change is needed to implement maxbundle.

BUG=1574
R=pthatcher@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/36789004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@8082 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
decurtis@webrtc.org
2015-01-15 22:53:49 +00:00
parent ef2a5dd398
commit 357469da5a
3 changed files with 67 additions and 64 deletions

View File

@ -46,25 +46,26 @@ TransportChannel* TransportProxy::GetChannel(int component) {
return GetChannelProxy(component); return GetChannelProxy(component);
} }
TransportChannel* TransportProxy::CreateChannel( TransportChannel* TransportProxy::CreateChannel(const std::string& name,
const std::string& name, int component) { int component) {
ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(GetChannel(component) == NULL); ASSERT(GetChannel(component) == NULL);
ASSERT(!transport_->get()->HasChannel(component)); ASSERT(!transport_->get()->HasChannel(component));
// We always create a proxy in case we need to change out the transport later. // We always create a proxy in case we need to change out the transport later.
TransportChannelProxy* channel = TransportChannelProxy* channel_proxy =
new TransportChannelProxy(content_name(), name, component); new TransportChannelProxy(content_name(), name, component);
channels_[component] = channel; channels_[component] = channel_proxy;
// If we're already negotiated, create an impl and hook it up to the proxy // If we're already negotiated, create an impl and hook it up to the proxy
// channel. If we're connecting, create an impl but don't hook it up yet. // channel. If we're connecting, create an impl but don't hook it up yet.
if (negotiated_) { if (negotiated_) {
SetupChannelProxy_w(component, channel); CreateChannelImpl_w(component);
SetChannelImplFromTransport_w(channel_proxy, component);
} else if (connecting_) { } else if (connecting_) {
GetOrCreateChannelProxyImpl_w(component); CreateChannelImpl_w(component);
} }
return channel; return channel_proxy;
} }
bool TransportProxy::HasChannel(int component) { bool TransportProxy::HasChannel(int component) {
@ -73,28 +74,29 @@ bool TransportProxy::HasChannel(int component) {
void TransportProxy::DestroyChannel(int component) { void TransportProxy::DestroyChannel(int component) {
ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(rtc::Thread::Current() == worker_thread_);
TransportChannel* channel = GetChannel(component); TransportChannelProxy* channel_proxy = GetChannelProxy(component);
if (channel) { if (channel_proxy) {
// If the state of TransportProxy is not NEGOTIATED // If the state of TransportProxy is not NEGOTIATED then
// then TransportChannelProxy and its impl are not // TransportChannelProxy and its impl are not connected. Both must
// connected. Both must be connected before // be connected before deletion.
// deletion. //
if (!negotiated_) { // However, if we haven't entered the connecting state then there
SetupChannelProxy_w(component, GetChannelProxy(component)); // is no implementation to hook up.
if (connecting_ && !negotiated_) {
SetChannelImplFromTransport_w(channel_proxy, component);
} }
channels_.erase(component); channels_.erase(component);
channel->SignalDestroyed(channel); channel_proxy->SignalDestroyed(channel_proxy);
delete channel; delete channel_proxy;
} }
} }
void TransportProxy::ConnectChannels() { void TransportProxy::ConnectChannels() {
if (!connecting_) { if (!connecting_) {
if (!negotiated_) { if (!negotiated_) {
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); ++iter) { CreateChannelImpl(iter.first);
GetOrCreateChannelProxyImpl(iter->first);
} }
} }
connecting_ = true; connecting_ = true;
@ -108,9 +110,14 @@ void TransportProxy::ConnectChannels() {
void TransportProxy::CompleteNegotiation() { void TransportProxy::CompleteNegotiation() {
if (!negotiated_) { if (!negotiated_) {
for (ChannelMap::iterator iter = channels_.begin(); // Negotiating assumes connecting_ has happened and
iter != channels_.end(); ++iter) { // implementations exist. If not we need to create the
SetupChannelProxy(iter->first, iter->second); // implementations.
for (auto& iter : channels_) {
if (!connecting_) {
CreateChannelImpl(iter.first);
}
SetChannelImplFromTransport(iter.second, iter.first);
} }
negotiated_ = true; negotiated_ = true;
} }
@ -169,43 +176,37 @@ TransportChannelProxy* TransportProxy::GetChannelProxyByName(
return NULL; return NULL;
} }
TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl( void TransportProxy::CreateChannelImpl(int component) {
int component) { worker_thread_->Invoke<void>(Bind(
return worker_thread_->Invoke<TransportChannelImpl*>(Bind( &TransportProxy::CreateChannelImpl_w, this, component));
&TransportProxy::GetOrCreateChannelProxyImpl_w, this, component));
} }
TransportChannelImpl* TransportProxy::GetOrCreateChannelProxyImpl_w( void TransportProxy::CreateChannelImpl_w(int component) {
ASSERT(rtc::Thread::Current() == worker_thread_);
transport_->get()->CreateChannel(component);
}
void TransportProxy::SetChannelImplFromTransport(TransportChannelProxy* proxy,
int component) {
worker_thread_->Invoke<void>(Bind(
&TransportProxy::SetChannelImplFromTransport_w, this, proxy, component));
}
void TransportProxy::SetChannelImplFromTransport_w(TransportChannelProxy* proxy,
int component) { int component) {
ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(rtc::Thread::Current() == worker_thread_);
TransportChannelImpl* impl = transport_->get()->GetChannel(component); TransportChannelImpl* impl = transport_->get()->GetChannel(component);
if (impl == NULL) {
impl = transport_->get()->CreateChannel(component);
}
return impl;
}
void TransportProxy::SetupChannelProxy(
int component, TransportChannelProxy* transproxy) {
worker_thread_->Invoke<void>(Bind(
&TransportProxy::SetupChannelProxy_w, this, component, transproxy));
}
void TransportProxy::SetupChannelProxy_w(
int component, TransportChannelProxy* transproxy) {
ASSERT(rtc::Thread::Current() == worker_thread_);
TransportChannelImpl* impl = GetOrCreateChannelProxyImpl(component);
ASSERT(impl != NULL); ASSERT(impl != NULL);
transproxy->SetImplementation(impl); ReplaceChannelImpl_w(proxy, impl);
} }
void TransportProxy::ReplaceChannelProxyImpl(TransportChannelProxy* proxy, void TransportProxy::ReplaceChannelImpl(TransportChannelProxy* proxy,
TransportChannelImpl* impl) { TransportChannelImpl* impl) {
worker_thread_->Invoke<void>(Bind( worker_thread_->Invoke<void>(Bind(
&TransportProxy::ReplaceChannelProxyImpl_w, this, proxy, impl)); &TransportProxy::ReplaceChannelImpl_w, this, proxy, impl));
} }
void TransportProxy::ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy, void TransportProxy::ReplaceChannelImpl_w(TransportChannelProxy* proxy,
TransportChannelImpl* impl) { TransportChannelImpl* impl) {
ASSERT(rtc::Thread::Current() == worker_thread_); ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(proxy != NULL); ASSERT(proxy != NULL);
@ -227,11 +228,11 @@ bool TransportProxy::SetupMux(TransportProxy* target) {
iter != channels_.end(); ++iter) { iter != channels_.end(); ++iter) {
if (!target->transport_->get()->HasChannel(iter->first)) { if (!target->transport_->get()->HasChannel(iter->first)) {
// Remove if channel doesn't exist in |transport_|. // Remove if channel doesn't exist in |transport_|.
ReplaceChannelProxyImpl(iter->second, NULL); ReplaceChannelImpl(iter->second, NULL);
} else { } else {
// Replace the impl for all the TransportProxyChannels with the channels // Replace the impl for all the TransportProxyChannels with the channels
// from |target|'s transport. Fail if there's not an exact match. // from |target|'s transport. Fail if there's not an exact match.
ReplaceChannelProxyImpl( ReplaceChannelImpl(
iter->second, target->transport_->get()->CreateChannel(iter->first)); iter->second, target->transport_->get()->CreateChannel(iter->first));
} }
} }

View File

@ -139,17 +139,18 @@ class TransportProxy : public sigslot::has_slots<>,
TransportChannelProxy* GetChannelProxy(int component) const; TransportChannelProxy* GetChannelProxy(int component) const;
TransportChannelProxy* GetChannelProxyByName(const std::string& name) const; TransportChannelProxy* GetChannelProxyByName(const std::string& name) const;
TransportChannelImpl* GetOrCreateChannelProxyImpl(int component); // Creates a new channel on the Transport which causes the reference
TransportChannelImpl* GetOrCreateChannelProxyImpl_w(int component); // count to increment.
void CreateChannelImpl(int component);
void CreateChannelImpl_w(int component);
// Manipulators of transportchannelimpl in channel proxy. // Manipulators of transportchannelimpl in channel proxy.
void SetupChannelProxy(int component, void SetChannelImplFromTransport(TransportChannelProxy* proxy, int component);
TransportChannelProxy* proxy); void SetChannelImplFromTransport_w(TransportChannelProxy* proxy,
void SetupChannelProxy_w(int component, int component);
TransportChannelProxy* proxy); void ReplaceChannelImpl(TransportChannelProxy* proxy,
void ReplaceChannelProxyImpl(TransportChannelProxy* proxy,
TransportChannelImpl* impl); TransportChannelImpl* impl);
void ReplaceChannelProxyImpl_w(TransportChannelProxy* proxy, void ReplaceChannelImpl_w(TransportChannelProxy* proxy,
TransportChannelImpl* impl); TransportChannelImpl* impl);
rtc::Thread* const worker_thread_; rtc::Thread* const worker_thread_;

View File

@ -33,8 +33,9 @@ TransportChannelProxy::TransportChannelProxy(const std::string& content_name,
TransportChannelProxy::~TransportChannelProxy() { TransportChannelProxy::~TransportChannelProxy() {
// Clearing any pending signal. // Clearing any pending signal.
worker_thread_->Clear(this); worker_thread_->Clear(this);
if (impl_) if (impl_) {
impl_->GetTransport()->DestroyChannel(impl_->component()); impl_->GetTransport()->DestroyChannel(impl_->component());
}
} }
void TransportChannelProxy::SetImplementation(TransportChannelImpl* impl) { void TransportChannelProxy::SetImplementation(TransportChannelImpl* impl) {