Annotate cricket::BaseChannel with thread guards

This CL also adds commentary to member variables that couldn't be guarded
because they're accessed from multiple threads.

Bug: webrtc:12230
Change-Id: I5193a7ef36ab25588c76ee6a1863de6a844be1dc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/195331
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32705}
This commit is contained in:
Harald Alvestrand
2020-11-26 07:24:32 +00:00
committed by Commit Bot
parent 0d863f72a8
commit 27883a2593
2 changed files with 121 additions and 62 deletions

View File

@ -160,6 +160,9 @@ BaseChannel::~BaseChannel() {
}
std::string BaseChannel::ToString() const {
// TODO(bugs.webrtc.org/12230): When media_channel_ is guarded by
// worker_thread(), rewrite this debug printout to not print the
// media type when called from non-worker-thread.
rtc::StringBuilder sb;
sb << "{mid: " << content_name_;
if (media_channel_) {
@ -170,8 +173,9 @@ std::string BaseChannel::ToString() const {
}
bool BaseChannel::ConnectToRtpTransport() {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_n()) {
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
return false;
}
@ -187,6 +191,7 @@ bool BaseChannel::ConnectToRtpTransport() {
}
void BaseChannel::DisconnectFromRtpTransport() {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
rtp_transport_->UnregisterRtpDemuxerSink(this);
rtp_transport_->SignalReadyToSend.disconnect(this);
@ -196,7 +201,7 @@ void BaseChannel::DisconnectFromRtpTransport() {
}
void BaseChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) {
RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK_RUN_ON(worker_thread());
network_thread_->Invoke<void>(
RTC_FROM_HERE, [this, rtp_transport] { SetRtpTransport(rtp_transport); });
@ -213,6 +218,7 @@ void BaseChannel::Deinit() {
// functions, so need to stop this process in Deinit that is called in
// derived classes destructor.
network_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
RTC_DCHECK_RUN_ON(network_thread());
FlushRtcpMessages_n();
if (rtp_transport_) {
@ -225,15 +231,15 @@ void BaseChannel::Deinit() {
}
bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
if (rtp_transport == rtp_transport_) {
return true;
}
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this, rtp_transport] {
return SetRtpTransport(rtp_transport);
});
}
RTC_DCHECK_RUN_ON(network_thread());
if (rtp_transport == rtp_transport_) {
return true;
}
if (rtp_transport_) {
DisconnectFromRtpTransport();
@ -338,7 +344,6 @@ int BaseChannel::SetOption(SocketType type,
int BaseChannel::SetOption_n(SocketType type,
rtc::Socket::Option opt,
int value) {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
switch (type) {
case ST_RTP:
@ -376,6 +381,7 @@ void BaseChannel::OnNetworkRouteChanged(
// work correctly. Intentionally leave it broken to simplify the code and
// encourage the users to stop using non-muxing RTCP.
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
});
}
@ -393,8 +399,10 @@ sigslot::signal1<const rtc::SentPacket&>& BaseChannel::SignalSentPacket() {
}
void BaseChannel::OnTransportReadyToSend(bool ready) {
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
[=] { media_channel_->OnReadyToSend(ready); });
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_, [=] {
RTC_DCHECK_RUN_ON(worker_thread());
media_channel_->OnReadyToSend(ready);
});
}
bool BaseChannel::SendPacket(bool rtcp,
@ -418,6 +426,7 @@ bool BaseChannel::SendPacket(bool rtcp,
network_thread_->Post(RTC_FROM_HERE, this, message_id, data);
return true;
}
RTC_DCHECK_RUN_ON(network_thread());
TRACE_EVENT0("webrtc", "BaseChannel::SendPacket");
@ -506,25 +515,34 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
void BaseChannel::UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions) {
RTC_DCHECK(rtp_transport_);
// Update the header extension map on network thread in case there is data
// race.
// TODO(zhihuang): Add an rtc::ThreadChecker make sure to RtpTransport won't
// be accessed from different threads.
//
// NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
// extension maps are not merged when BUNDLE is enabled. This is fine because
// the ID for MID should be consistent among all the RTP transports.
network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &header_extensions] {
RTC_DCHECK_RUN_ON(network_thread());
rtp_transport_->UpdateRtpHeaderExtensionMap(header_extensions);
});
}
bool BaseChannel::RegisterRtpDemuxerSink() {
bool BaseChannel::RegisterRtpDemuxerSink_w() {
// Copy demuxer criteria, since they're a worker-thread variable
// and we want to pass them to the network thread
return network_thread_->Invoke<bool>(
RTC_FROM_HERE, [this, demuxer_criteria = demuxer_criteria_] {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(rtp_transport_);
return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria, this);
});
}
bool BaseChannel::RegisterRtpDemuxerSink_n() {
RTC_DCHECK(rtp_transport_);
return network_thread_->Invoke<bool>(RTC_FROM_HERE, [this] {
return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
});
// TODO(bugs.webrtc.org/12230): This accesses demuxer_criteria_ on the
// networking thread.
return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
}
void BaseChannel::EnableMedia_w() {
@ -557,7 +575,6 @@ void BaseChannel::UpdateWritableState_n() {
}
void BaseChannel::ChannelWritable_n() {
RTC_DCHECK_RUN_ON(network_thread());
if (writable_) {
return;
}
@ -571,7 +588,6 @@ void BaseChannel::ChannelWritable_n() {
}
void BaseChannel::ChannelNotWritable_n() {
RTC_DCHECK_RUN_ON(network_thread());
if (!writable_)
return;
@ -581,12 +597,10 @@ void BaseChannel::ChannelNotWritable_n() {
}
bool BaseChannel::AddRecvStream_w(const StreamParams& sp) {
RTC_DCHECK(worker_thread() == rtc::Thread::Current());
return media_channel()->AddRecvStream(sp);
}
bool BaseChannel::RemoveRecvStream_w(uint32_t ssrc) {
RTC_DCHECK(worker_thread() == rtc::Thread::Current());
return media_channel()->RemoveRecvStream(ssrc);
}
@ -596,7 +610,6 @@ void BaseChannel::ResetUnsignaledRecvStream_w() {
}
bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
RTC_DCHECK_RUN_ON(worker_thread());
if (enabled == payload_type_demuxing_enabled_) {
return true;
}
@ -609,7 +622,7 @@ bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
// there is no straightforward way to identify those streams.
media_channel()->ResetUnsignaledRecvStream();
demuxer_criteria_.payload_types.clear();
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to disable payload type demuxing for "
<< ToString();
return false;
@ -617,7 +630,7 @@ bool BaseChannel::SetPayloadTypeDemuxingEnabled_w(bool enabled) {
} else if (!payload_types_.empty()) {
demuxer_criteria_.payload_types.insert(payload_types_.begin(),
payload_types_.end());
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to enable payload type demuxing for "
<< ToString();
return false;
@ -765,7 +778,7 @@ bool BaseChannel::UpdateRemoteStreams_w(
new_stream.ssrcs.end());
}
// Re-register the sink to update the receiving ssrcs.
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up demuxing for " << ToString();
ret = false;
}
@ -775,7 +788,6 @@ bool BaseChannel::UpdateRemoteStreams_w(
RtpHeaderExtensions BaseChannel::GetFilteredRtpHeaderExtensions(
const RtpHeaderExtensions& extensions) {
RTC_DCHECK(rtp_transport_);
if (crypto_options_.srtp.enable_encrypted_rtp_header_extensions) {
RtpHeaderExtensions filtered;
absl::c_copy_if(extensions, std::back_inserter(filtered),
@ -826,7 +838,6 @@ void BaseChannel::ClearHandledPayloadTypes() {
void BaseChannel::FlushRtcpMessages_n() {
// Flush all remaining RTCP messages. This should only be called in
// destructor.
RTC_DCHECK_RUN_ON(network_thread());
rtc::MessageList rtcp_messages;
network_thread_->Clear(this, MSG_SEND_RTCP_PACKET, &rtcp_messages);
for (const auto& message : rtcp_messages) {
@ -836,7 +847,6 @@ void BaseChannel::FlushRtcpMessages_n() {
}
void BaseChannel::SignalSentPacket_n(const rtc::SentPacket& sent_packet) {
RTC_DCHECK_RUN_ON(network_thread());
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, worker_thread_,
[this, sent_packet] {
RTC_DCHECK_RUN_ON(worker_thread());
@ -881,6 +891,7 @@ void VoiceChannel::Init_w(webrtc::RtpTransportInternal* rtp_transport) {
void VoiceChannel::UpdateMediaSendRecvState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
RTC_DCHECK_RUN_ON(worker_thread());
bool recv = IsReadyToReceiveMedia_w();
media_channel()->SetPlayout(recv);
@ -931,7 +942,7 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up audio demuxing for " << ToString();
return false;
}
@ -997,7 +1008,7 @@ bool VoiceChannel::SetRemoteContent_w(const MediaContentDescription* content,
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to update audio demuxing for " << ToString();
return false;
}
@ -1048,6 +1059,7 @@ VideoChannel::~VideoChannel() {
void VideoChannel::UpdateMediaSendRecvState_w() {
// Send outgoing data if we're the active call, we have the remote content,
// and we have had some form of connectivity.
RTC_DCHECK_RUN_ON(worker_thread());
bool send = IsReadyToSendMedia_w();
if (!media_channel()->SetSend(send)) {
RTC_LOG(LS_ERROR) << "Failed to SetSend on video channel: " + ToString();
@ -1124,7 +1136,7 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up video demuxing for " << ToString();
return false;
}
@ -1234,7 +1246,7 @@ bool VideoChannel::SetRemoteContent_w(const MediaContentDescription* content,
"disable payload type demuxing for "
<< ToString();
ClearHandledPayloadTypes();
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to update video demuxing for " << ToString();
return false;
}
@ -1349,7 +1361,7 @@ bool RtpDataChannel::SetLocalContent_w(const MediaContentDescription* content,
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
if (!RegisterRtpDemuxerSink()) {
if (!RegisterRtpDemuxerSink_w()) {
RTC_LOG(LS_ERROR) << "Failed to set up data demuxing for " << ToString();
return false;
}
@ -1437,6 +1449,7 @@ bool RtpDataChannel::SetRemoteContent_w(const MediaContentDescription* content,
void RtpDataChannel::UpdateMediaSendRecvState_w() {
// Render incoming data if we're the active call, and we have the local
// content. We receive data on the default channel and multiplexed streams.
RTC_DCHECK_RUN_ON(worker_thread());
bool recv = IsReadyToReceiveMedia_w();
if (!media_channel()->SetReceive(recv)) {
RTC_LOG(LS_ERROR) << "Failed to SetReceive on data channel: " << ToString();

View File

@ -106,6 +106,13 @@ class BaseChannel : public ChannelInterface,
// This function returns true if using SRTP (DTLS-based keying or SDES).
bool srtp_active() const {
// TODO(bugs.webrtc.org/12230): At least some tests call this function
// from other threads.
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<bool>(RTC_FROM_HERE,
[this] { return srtp_active(); });
}
RTC_DCHECK_RUN_ON(network_thread());
return rtp_transport_ && rtp_transport_->IsSrtpActive();
}
@ -117,7 +124,16 @@ class BaseChannel : public ChannelInterface,
// internally. It would replace the |SetTransports| and its variants.
bool SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) override;
webrtc::RtpTransportInternal* rtp_transport() const { return rtp_transport_; }
webrtc::RtpTransportInternal* rtp_transport() const {
// TODO(bugs.webrtc.org/12230): At least some tests call this function
// from other threads.
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<webrtc::RtpTransportInternal*>(
RTC_FROM_HERE, [this] { return rtp_transport(); });
}
RTC_DCHECK_RUN_ON(network_thread());
return rtp_transport_;
}
// Channel control
bool SetLocalContent(const MediaContentDescription* content,
@ -156,7 +172,8 @@ class BaseChannel : public ChannelInterface,
// Only public for unit tests. Otherwise, consider protected.
int SetOption(SocketType type, rtc::Socket::Option o, int val) override;
int SetOption_n(SocketType type, rtc::Socket::Option o, int val);
int SetOption_n(SocketType type, rtc::Socket::Option o, int val)
RTC_RUN_ON(network_thread());
// RtpPacketSinkInterface overrides.
void OnRtpPacket(const webrtc::RtpPacketReceived& packet) override;
@ -167,14 +184,24 @@ class BaseChannel : public ChannelInterface,
transport_name_ = transport_name;
}
MediaChannel* media_channel() const override { return media_channel_.get(); }
MediaChannel* media_channel() const override {
// TODO(bugs.webrtc.org/12230): Called on multiple threads,
// including from StatsCollector::ExtractMediaInfo.
// RTC_DCHECK_RUN_ON(worker_thread());
return media_channel_.get();
}
protected:
bool was_ever_writable() const { return was_ever_writable_; }
bool was_ever_writable() const {
RTC_DCHECK_RUN_ON(network_thread());
return was_ever_writable_;
}
void set_local_content_direction(webrtc::RtpTransceiverDirection direction) {
RTC_DCHECK_RUN_ON(worker_thread());
local_content_direction_ = direction;
}
void set_remote_content_direction(webrtc::RtpTransceiverDirection direction) {
RTC_DCHECK_RUN_ON(worker_thread());
remote_content_direction_ = direction;
}
// These methods verify that:
@ -187,11 +214,11 @@ class BaseChannel : public ChannelInterface,
//
// When any of these properties change, UpdateMediaSendRecvState_w should be
// called.
bool IsReadyToReceiveMedia_w() const;
bool IsReadyToSendMedia_w() const;
bool IsReadyToReceiveMedia_w() const RTC_RUN_ON(worker_thread());
bool IsReadyToSendMedia_w() const RTC_RUN_ON(worker_thread());
rtc::Thread* signaling_thread() { return signaling_thread_; }
void FlushRtcpMessages_n();
void FlushRtcpMessages_n() RTC_RUN_ON(network_thread());
// NetworkInterface implementation, called by MediaEngine
bool SendPacket(rtc::CopyOnWriteBuffer* packet,
@ -211,22 +238,23 @@ class BaseChannel : public ChannelInterface,
rtc::CopyOnWriteBuffer* packet,
const rtc::PacketOptions& options);
void EnableMedia_w();
void DisableMedia_w();
void EnableMedia_w() RTC_RUN_ON(worker_thread());
void DisableMedia_w() RTC_RUN_ON(worker_thread());
// Performs actions if the RTP/RTCP writable state changed. This should
// be called whenever a channel's writable state changes or when RTCP muxing
// becomes active/inactive.
void UpdateWritableState_n();
void ChannelWritable_n();
void ChannelNotWritable_n();
void UpdateWritableState_n() RTC_RUN_ON(network_thread());
void ChannelWritable_n() RTC_RUN_ON(network_thread());
void ChannelNotWritable_n() RTC_RUN_ON(network_thread());
bool AddRecvStream_w(const StreamParams& sp);
bool RemoveRecvStream_w(uint32_t ssrc);
void ResetUnsignaledRecvStream_w();
bool SetPayloadTypeDemuxingEnabled_w(bool enabled);
bool AddSendStream_w(const StreamParams& sp);
bool RemoveSendStream_w(uint32_t ssrc);
bool AddRecvStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
bool RemoveRecvStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
void ResetUnsignaledRecvStream_w() RTC_RUN_ON(worker_thread());
bool SetPayloadTypeDemuxingEnabled_w(bool enabled)
RTC_RUN_ON(worker_thread());
bool AddSendStream_w(const StreamParams& sp) RTC_RUN_ON(worker_thread());
bool RemoveSendStream_w(uint32_t ssrc) RTC_RUN_ON(worker_thread());
// Should be called whenever the conditions for
// IsReadyToReceiveMedia/IsReadyToSendMedia are satisfied (or unsatisfied).
@ -236,10 +264,12 @@ class BaseChannel : public ChannelInterface,
bool UpdateLocalStreams_w(const std::vector<StreamParams>& streams,
webrtc::SdpType type,
std::string* error_desc);
std::string* error_desc)
RTC_RUN_ON(worker_thread());
bool UpdateRemoteStreams_w(const std::vector<StreamParams>& streams,
webrtc::SdpType type,
std::string* error_desc);
std::string* error_desc)
RTC_RUN_ON(worker_thread());
virtual bool SetLocalContent_w(const MediaContentDescription* content,
webrtc::SdpType type,
std::string* error_desc) = 0;
@ -271,7 +301,8 @@ class BaseChannel : public ChannelInterface,
void UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions);
bool RegisterRtpDemuxerSink();
bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread());
bool RegisterRtpDemuxerSink_n() RTC_RUN_ON(network_thread());
// Return description of media channel to facilitate logging
std::string ToString() const;
@ -281,8 +312,9 @@ class BaseChannel : public ChannelInterface,
private:
bool ConnectToRtpTransport();
void DisconnectFromRtpTransport();
void SignalSentPacket_n(const rtc::SentPacket& sent_packet);
bool IsReadyToSendMedia_n() const;
void SignalSentPacket_n(const rtc::SentPacket& sent_packet)
RTC_RUN_ON(network_thread());
bool IsReadyToSendMedia_n() const RTC_RUN_ON(network_thread());
rtc::Thread* const worker_thread_;
rtc::Thread* const network_thread_;
@ -296,27 +328,39 @@ class BaseChannel : public ChannelInterface,
const std::string content_name_;
// Won't be set when using raw packet transports. SDP-specific thing.
// TODO(bugs.webrtc.org/12230): Written on network thread, read on
// worker thread (at least).
std::string transport_name_;
webrtc::RtpTransportInternal* rtp_transport_ = nullptr;
webrtc::RtpTransportInternal* rtp_transport_
RTC_GUARDED_BY(network_thread()) = nullptr;
std::vector<std::pair<rtc::Socket::Option, int> > socket_options_;
std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_;
std::vector<std::pair<rtc::Socket::Option, int> > socket_options_
RTC_GUARDED_BY(network_thread());
std::vector<std::pair<rtc::Socket::Option, int> > rtcp_socket_options_
RTC_GUARDED_BY(network_thread());
// TODO(bugs.webrtc.org/12230): writable_ is accessed in tests
// outside of the network thread.
bool writable_ = false;
bool was_ever_writable_ = false;
bool was_ever_writable_ RTC_GUARDED_BY(network_thread()) = false;
const bool srtp_required_ = true;
webrtc::CryptoOptions crypto_options_;
const webrtc::CryptoOptions crypto_options_;
// MediaChannel related members that should be accessed from the worker
// thread.
// TODO(bugs.webrtc.org/12230): written on worker thread, accessed by
// multiple threads.
std::unique_ptr<MediaChannel> media_channel_;
// Currently the |enabled_| flag is accessed from the signaling thread as
// well, but it can be changed only when signaling thread does a synchronous
// call to the worker thread, so it should be safe.
bool enabled_ = false;
bool payload_type_demuxing_enabled_ RTC_GUARDED_BY(worker_thread()) = true;
std::vector<StreamParams> local_streams_;
std::vector<StreamParams> remote_streams_;
std::vector<StreamParams> local_streams_ RTC_GUARDED_BY(worker_thread());
std::vector<StreamParams> remote_streams_ RTC_GUARDED_BY(worker_thread());
// TODO(bugs.webrtc.org/12230): local_content_direction and
// remote_content_direction are set on the worker thread, but accessed on the
// network thread.
webrtc::RtpTransceiverDirection local_content_direction_ =
webrtc::RtpTransceiverDirection::kInactive;
webrtc::RtpTransceiverDirection remote_content_direction_ =
@ -324,6 +368,8 @@ class BaseChannel : public ChannelInterface,
// Cached list of payload types, used if payload type demuxing is re-enabled.
std::set<uint8_t> payload_types_ RTC_GUARDED_BY(worker_thread());
// TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed
// on network thread in RegisterRtpDemuxerSink_n (called from Init_w)
webrtc::RtpDemuxerCriteria demuxer_criteria_;
// This generator is used to generate SSRCs for local streams.
// This is needed in cases where SSRCs are not negotiated or set explicitly