Use AsyncInvoker in PeerConnection instead of MessageHandler

Bug: webrtc:9702
Change-Id: I89d66d1165a096601aed37b8febad60620073899
Reviewed-on: https://webrtc-review.googlesource.com/97180
Reviewed-by: Seth Hampson <shampson@webrtc.org>
Commit-Queue: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24515}
This commit is contained in:
Steve Anton
2018-08-31 11:57:02 -07:00
committed by Commit Bot
parent ed1f75ab6d
commit bb19276a32
2 changed files with 43 additions and 108 deletions

View File

@ -101,43 +101,8 @@ static const char kDefaultVideoSenderId[] = "defaultv0";
// The length of RTCP CNAMEs. // The length of RTCP CNAMEs.
static const int kRtcpCnameLength = 16; static const int kRtcpCnameLength = 16;
enum {
MSG_SET_SESSIONDESCRIPTION_SUCCESS = 0,
MSG_SET_SESSIONDESCRIPTION_FAILED,
MSG_CREATE_SESSIONDESCRIPTION_FAILED,
MSG_GETSTATS,
MSG_FREE_DATACHANNELS,
MSG_REPORT_USAGE_PATTERN,
};
static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000; static const int REPORT_USAGE_PATTERN_DELAY_MS = 60000;
struct SetSessionDescriptionMsg : public rtc::MessageData {
explicit SetSessionDescriptionMsg(
webrtc::SetSessionDescriptionObserver* observer)
: observer(observer) {}
rtc::scoped_refptr<webrtc::SetSessionDescriptionObserver> observer;
RTCError error;
};
struct CreateSessionDescriptionMsg : public rtc::MessageData {
explicit CreateSessionDescriptionMsg(
webrtc::CreateSessionDescriptionObserver* observer)
: observer(observer) {}
rtc::scoped_refptr<webrtc::CreateSessionDescriptionObserver> observer;
RTCError error;
};
struct GetStatsMsg : public rtc::MessageData {
GetStatsMsg(webrtc::StatsObserver* observer,
webrtc::MediaStreamTrackInterface* track)
: observer(observer), track(track) {}
rtc::scoped_refptr<webrtc::StatsObserver> observer;
rtc::scoped_refptr<webrtc::MediaStreamTrackInterface> track;
};
// Check if we can send |new_stream| on a PeerConnection. // Check if we can send |new_stream| on a PeerConnection.
bool CanAddLocalMediaStream(webrtc::StreamCollectionInterface* current_streams, bool CanAddLocalMediaStream(webrtc::StreamCollectionInterface* current_streams,
webrtc::MediaStreamInterface* new_stream) { webrtc::MediaStreamInterface* new_stream) {
@ -1028,8 +993,9 @@ bool PeerConnection::Initialize(
} }
int delay_ms = int delay_ms =
return_histogram_very_quickly_ ? 0 : REPORT_USAGE_PATTERN_DELAY_MS; return_histogram_very_quickly_ ? 0 : REPORT_USAGE_PATTERN_DELAY_MS;
signaling_thread()->PostDelayed(RTC_FROM_HERE, delay_ms, this, async_invoker_.AsyncInvokeDelayed<void>(RTC_FROM_HERE, signaling_thread(),
MSG_REPORT_USAGE_PATTERN, nullptr); [this] { ReportUsagePattern(); },
delay_ms);
return true; return true;
} }
@ -1582,8 +1548,16 @@ bool PeerConnection::GetStats(StatsObserver* observer,
<< track->id(); << track->id();
return false; return false;
} }
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_GETSTATS, // Need to capture |observer| and |track| in scoped_refptrs to ensure they
new GetStatsMsg(observer, track)); // live long enough.
rtc::scoped_refptr<StatsObserver> observer_refptr(observer);
rtc::scoped_refptr<MediaStreamTrackInterface> track_refptr(track);
async_invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread(),
[this, observer_refptr, track_refptr] {
StatsReports reports;
stats_->GetStats(track_refptr, &reports);
observer_refptr->OnComplete(reports);
});
return true; return true;
} }
@ -1912,9 +1886,9 @@ void PeerConnection::SetLocalDescription(
PostSetSessionDescriptionSuccess(observer); PostSetSessionDescriptionSuccess(observer);
// MaybeStartGathering needs to be called after posting // MaybeStartGathering needs to be called after posting OnSuccess to the
// MSG_SET_SESSIONDESCRIPTION_SUCCESS, so that we don't signal any candidates // SetSessionDescriptionObserver so that we don't signal any candidates before
// before signaling that SetLocalDescription completed. // signaling that SetLocalDescription completed.
transport_controller_->MaybeStartGathering(); transport_controller_->MaybeStartGathering();
if (local_description()->GetType() == SdpType::kAnswer) { if (local_description()->GetType() == SdpType::kAnswer) {
@ -3237,51 +3211,6 @@ void PeerConnection::Close() {
observer_ = nullptr; observer_ = nullptr;
} }
void PeerConnection::OnMessage(rtc::Message* msg) {
switch (msg->message_id) {
case MSG_SET_SESSIONDESCRIPTION_SUCCESS: {
SetSessionDescriptionMsg* param =
static_cast<SetSessionDescriptionMsg*>(msg->pdata);
param->observer->OnSuccess();
delete param;
break;
}
case MSG_SET_SESSIONDESCRIPTION_FAILED: {
SetSessionDescriptionMsg* param =
static_cast<SetSessionDescriptionMsg*>(msg->pdata);
param->observer->OnFailure(std::move(param->error));
delete param;
break;
}
case MSG_CREATE_SESSIONDESCRIPTION_FAILED: {
CreateSessionDescriptionMsg* param =
static_cast<CreateSessionDescriptionMsg*>(msg->pdata);
param->observer->OnFailure(std::move(param->error));
delete param;
break;
}
case MSG_GETSTATS: {
GetStatsMsg* param = static_cast<GetStatsMsg*>(msg->pdata);
StatsReports reports;
stats_->GetStats(param->track, &reports);
param->observer->OnComplete(reports);
delete param;
break;
}
case MSG_FREE_DATACHANNELS: {
sctp_data_channels_to_free_.clear();
break;
}
case MSG_REPORT_USAGE_PATTERN: {
ReportUsagePattern();
break;
}
default:
RTC_NOTREACHED() << "Not implemented";
break;
}
}
cricket::VoiceMediaChannel* PeerConnection::voice_media_channel() const { cricket::VoiceMediaChannel* PeerConnection::voice_media_channel() const {
RTC_DCHECK(!IsUnifiedPlan()); RTC_DCHECK(!IsUnifiedPlan());
auto* voice_channel = static_cast<cricket::VoiceChannel*>( auto* voice_channel = static_cast<cricket::VoiceChannel*>(
@ -3554,29 +3483,37 @@ void PeerConnection::OnVideoTrackRemoved(VideoTrackInterface* track,
void PeerConnection::PostSetSessionDescriptionSuccess( void PeerConnection::PostSetSessionDescriptionSuccess(
SetSessionDescriptionObserver* observer) { SetSessionDescriptionObserver* observer) {
SetSessionDescriptionMsg* msg = new SetSessionDescriptionMsg(observer); async_invoker_.AsyncInvoke<void>(
signaling_thread()->Post(RTC_FROM_HERE, this, RTC_FROM_HERE, signaling_thread(),
MSG_SET_SESSIONDESCRIPTION_SUCCESS, msg); rtc::Bind(&SetSessionDescriptionObserver::OnSuccess, observer));
} }
void PeerConnection::PostSetSessionDescriptionFailure( void PeerConnection::PostSetSessionDescriptionFailure(
SetSessionDescriptionObserver* observer, SetSessionDescriptionObserver* observer,
RTCError&& error) { RTCError error) {
RTC_DCHECK(!error.ok()); RTC_DCHECK(!error.ok());
SetSessionDescriptionMsg* msg = new SetSessionDescriptionMsg(observer); // TODO(steveanton): In C++14 this can be done with a lambda.
msg->error = std::move(error); struct Functor {
signaling_thread()->Post(RTC_FROM_HERE, this, void operator()() { observer->OnFailure(std::move(error)); }
MSG_SET_SESSIONDESCRIPTION_FAILED, msg); rtc::scoped_refptr<SetSessionDescriptionObserver> observer;
RTCError error;
};
async_invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread(),
Functor{observer, std::move(error)});
} }
void PeerConnection::PostCreateSessionDescriptionFailure( void PeerConnection::PostCreateSessionDescriptionFailure(
CreateSessionDescriptionObserver* observer, CreateSessionDescriptionObserver* observer,
RTCError error) { RTCError error) {
RTC_DCHECK(!error.ok()); RTC_DCHECK(!error.ok());
CreateSessionDescriptionMsg* msg = new CreateSessionDescriptionMsg(observer); // TODO(steveanton): In C++14 this can be done with a lambda.
msg->error = std::move(error); struct Functor {
signaling_thread()->Post(RTC_FROM_HERE, this, void operator()() { observer->OnFailure(std::move(error)); }
MSG_CREATE_SESSIONDESCRIPTION_FAILED, msg); rtc::scoped_refptr<CreateSessionDescriptionObserver> observer;
RTCError error;
};
async_invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread(),
Functor{observer, std::move(error)});
} }
void PeerConnection::GetOptionsForOffer( void PeerConnection::GetOptionsForOffer(
@ -4468,8 +4405,9 @@ void PeerConnection::OnSctpDataChannelClosed(DataChannel* channel) {
// we can't free it directly here; we need to free it asynchronously. // we can't free it directly here; we need to free it asynchronously.
sctp_data_channels_to_free_.push_back(*it); sctp_data_channels_to_free_.push_back(*it);
sctp_data_channels_.erase(it); sctp_data_channels_.erase(it);
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_FREE_DATACHANNELS, async_invoker_.AsyncInvoke<void>(
nullptr); RTC_FROM_HERE, signaling_thread(),
[this] { sctp_data_channels_to_free_.clear(); });
return; return;
} }
} }
@ -6233,8 +6171,8 @@ void PeerConnection::ClearStatsCache() {
} }
void PeerConnection::RequestUsagePatternReportForTesting() { void PeerConnection::RequestUsagePatternReportForTesting() {
signaling_thread()->Post(RTC_FROM_HERE, this, MSG_REPORT_USAGE_PATTERN, async_invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread(),
nullptr); [this] { ReportUsagePattern(); });
} }
} // namespace webrtc } // namespace webrtc

View File

@ -52,7 +52,6 @@ class RtcEventLog;
class PeerConnection : public PeerConnectionInternal, class PeerConnection : public PeerConnectionInternal,
public DataChannelProviderInterface, public DataChannelProviderInterface,
public JsepTransportController::Observer, public JsepTransportController::Observer,
public rtc::MessageHandler,
public sigslot::has_slots<> { public sigslot::has_slots<> {
public: public:
enum class UsageEvent : int { enum class UsageEvent : int {
@ -288,9 +287,6 @@ class PeerConnection : public PeerConnectionInternal,
uint32_t first_ssrc; uint32_t first_ssrc;
}; };
// Implements MessageHandler.
void OnMessage(rtc::Message* msg) override;
// Plan B helpers for getting the voice/video media channels for the single // Plan B helpers for getting the voice/video media channels for the single
// audio/video transceiver, if it exists. // audio/video transceiver, if it exists.
cricket::VoiceMediaChannel* voice_media_channel() const; cricket::VoiceMediaChannel* voice_media_channel() const;
@ -395,7 +391,7 @@ class PeerConnection : public PeerConnectionInternal,
void PostSetSessionDescriptionSuccess( void PostSetSessionDescriptionSuccess(
SetSessionDescriptionObserver* observer); SetSessionDescriptionObserver* observer);
void PostSetSessionDescriptionFailure(SetSessionDescriptionObserver* observer, void PostSetSessionDescriptionFailure(SetSessionDescriptionObserver* observer,
RTCError&& error); RTCError error);
void PostCreateSessionDescriptionFailure( void PostCreateSessionDescriptionFailure(
CreateSessionDescriptionObserver* observer, CreateSessionDescriptionObserver* observer,
RTCError error); RTCError error);
@ -1033,6 +1029,7 @@ class PeerConnection : public PeerConnectionInternal,
int usage_event_accumulator_ = 0; int usage_event_accumulator_ = 0;
bool return_histogram_very_quickly_ = false; bool return_histogram_very_quickly_ = false;
rtc::AsyncInvoker async_invoker_;
}; };
} // namespace webrtc } // namespace webrtc