diff --git a/pc/data_channel.cc b/pc/data_channel.cc index 795bf8d315..e4f658cbec 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -137,9 +137,12 @@ rtc::scoped_refptr DataChannel::Create( DataChannelProviderInterface* provider, cricket::DataChannelType dct, const std::string& label, - const InternalDataChannelInit& config) { + const InternalDataChannelInit& config, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread) { rtc::scoped_refptr channel( - new rtc::RefCountedObject(config, provider, dct, label)); + new rtc::RefCountedObject(config, provider, dct, label, + signaling_thread, network_thread)); if (!channel->Init()) { return nullptr; } @@ -155,8 +158,12 @@ bool DataChannel::IsSctpLike(cricket::DataChannelType type) { DataChannel::DataChannel(const InternalDataChannelInit& config, DataChannelProviderInterface* provider, cricket::DataChannelType dct, - const std::string& label) - : internal_id_(GenerateUniqueId()), + const std::string& label, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread) + : signaling_thread_(signaling_thread), + network_thread_(network_thread), + internal_id_(GenerateUniqueId()), label_(label), config_(config), observer_(nullptr), @@ -174,9 +181,12 @@ DataChannel::DataChannel(const InternalDataChannelInit& config, receive_ssrc_set_(false), writable_(false), send_ssrc_(0), - receive_ssrc_(0) {} + receive_ssrc_(0) { + RTC_DCHECK_RUN_ON(signaling_thread_); +} bool DataChannel::Init() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (data_channel_type_ == cricket::DCT_RTP) { if (config_.reliable || config_.id != -1 || config_.maxRetransmits || config_.maxRetransmitTime) { @@ -229,18 +239,23 @@ bool DataChannel::Init() { return true; } -DataChannel::~DataChannel() {} +DataChannel::~DataChannel() { + RTC_DCHECK_RUN_ON(signaling_thread_); +} void DataChannel::RegisterObserver(DataChannelObserver* observer) { + RTC_DCHECK_RUN_ON(signaling_thread_); observer_ = observer; DeliverQueuedReceivedData(); } void DataChannel::UnregisterObserver() { - observer_ = NULL; + RTC_DCHECK_RUN_ON(signaling_thread_); + observer_ = nullptr; } bool DataChannel::reliable() const { + // May be called on any thread. if (data_channel_type_ == cricket::DCT_RTP) { return false; } else { @@ -249,10 +264,12 @@ bool DataChannel::reliable() const { } uint64_t DataChannel::buffered_amount() const { + RTC_DCHECK_RUN_ON(signaling_thread_); return buffered_amount_; } void DataChannel::Close() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == kClosed) return; send_ssrc_ = 0; @@ -262,11 +279,42 @@ void DataChannel::Close() { UpdateState(); } +DataChannel::DataState DataChannel::state() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return state_; +} + RTCError DataChannel::error() const { + RTC_DCHECK_RUN_ON(signaling_thread_); return error_; } +uint32_t DataChannel::messages_sent() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return messages_sent_; +} + +uint64_t DataChannel::bytes_sent() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return bytes_sent_; +} + +uint32_t DataChannel::messages_received() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return messages_received_; +} + +uint64_t DataChannel::bytes_received() const { + RTC_DCHECK_RUN_ON(signaling_thread_); + return bytes_received_; +} + bool DataChannel::Send(const DataBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); + // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network + // thread. Bring buffer management etc to the network thread and keep the + // operational state management on the signaling thread. + buffered_amount_ += buffer.size(); if (state_ != kOpen) { return false; @@ -306,6 +354,7 @@ bool DataChannel::Send(const DataBuffer& buffer) { } void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); if (receive_ssrc_set_) { @@ -329,6 +378,7 @@ void DataChannel::SetSctpSid(int sid) { } void DataChannel::OnClosingProcedureStartedRemotely(int sid) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (IsSctpLike(data_channel_type_) && sid == config_.id && state_ != kClosing && state_ != kClosed) { // Don't bother sending queued data since the side that initiated the @@ -345,6 +395,7 @@ void DataChannel::OnClosingProcedureStartedRemotely(int sid) { } void DataChannel::OnClosingProcedureComplete(int sid) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (IsSctpLike(data_channel_type_) && sid == config_.id) { // If the closing procedure is complete, we should have finished sending // all pending data and transitioned to kClosing already. @@ -356,6 +407,7 @@ void DataChannel::OnClosingProcedureComplete(int sid) { } void DataChannel::OnTransportChannelCreated() { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(IsSctpLike(data_channel_type_)); if (!connected_to_provider_) { connected_to_provider_ = provider_->ConnectDataChannel(this); @@ -385,6 +437,7 @@ void DataChannel::RemotePeerRequestClose() { } void DataChannel::SetSendSsrc(uint32_t send_ssrc) { + RTC_DCHECK_RUN_ON(signaling_thread_); RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); if (send_ssrc_set_) { return; @@ -396,6 +449,7 @@ void DataChannel::SetSendSsrc(uint32_t send_ssrc) { void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& payload) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (data_channel_type_ == cricket::DCT_RTP && params.ssrc != receive_ssrc_) { return; } @@ -462,6 +516,8 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, } void DataChannel::OnChannelReady(bool writable) { + RTC_DCHECK_RUN_ON(signaling_thread_); + writable_ = writable; if (!writable) { return; @@ -473,6 +529,8 @@ void DataChannel::OnChannelReady(bool writable) { } void DataChannel::CloseAbruptlyWithError(RTCError error) { + RTC_DCHECK_RUN_ON(signaling_thread_); + if (state_ == kClosed) { return; } @@ -501,6 +559,7 @@ void DataChannel::CloseAbruptlyWithDataChannelFailure( } void DataChannel::UpdateState() { + RTC_DCHECK_RUN_ON(signaling_thread_); // UpdateState determines what to do from a few state variables. Include // all conditions required for each state transition here for // clarity. OnChannelReady(true) will send any queued data and then invoke @@ -568,6 +627,7 @@ void DataChannel::UpdateState() { } void DataChannel::SetState(DataState state) { + RTC_DCHECK_RUN_ON(signaling_thread_); if (state_ == state) { return; } @@ -584,6 +644,7 @@ void DataChannel::SetState(DataState state) { } void DataChannel::DisconnectFromProvider() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (!connected_to_provider_) return; @@ -592,6 +653,7 @@ void DataChannel::DisconnectFromProvider() { } void DataChannel::DeliverQueuedReceivedData() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (!observer_) { return; } @@ -605,6 +667,7 @@ void DataChannel::DeliverQueuedReceivedData() { } void DataChannel::SendQueuedDataMessages() { + RTC_DCHECK_RUN_ON(signaling_thread_); if (queued_send_data_.Empty()) { return; } @@ -623,6 +686,7 @@ void DataChannel::SendQueuedDataMessages() { bool DataChannel::SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked) { + RTC_DCHECK_RUN_ON(signaling_thread_); cricket::SendDataParams send_params; if (IsSctpLike(data_channel_type_)) { @@ -681,6 +745,7 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, } bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); size_t start_buffered_amount = queued_send_data_.byte_count(); if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) { RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; @@ -691,6 +756,7 @@ bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { } void DataChannel::SendQueuedControlMessages() { + RTC_DCHECK_RUN_ON(signaling_thread_); PacketQueue control_packets; control_packets.Swap(&queued_control_data_); @@ -701,10 +767,12 @@ void DataChannel::SendQueuedControlMessages() { } void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); queued_control_data_.PushBack(std::make_unique(buffer, true)); } bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { + RTC_DCHECK_RUN_ON(signaling_thread_); bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen; RTC_DCHECK(IsSctpLike(data_channel_type_)); diff --git a/pc/data_channel.h b/pc/data_channel.h index 1ee267924a..e84325022d 100644 --- a/pc/data_channel.h +++ b/pc/data_channel.h @@ -117,47 +117,51 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { DataChannelProviderInterface* provider, cricket::DataChannelType dct, const std::string& label, - const InternalDataChannelInit& config); + const InternalDataChannelInit& config, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread); static bool IsSctpLike(cricket::DataChannelType type); - virtual void RegisterObserver(DataChannelObserver* observer); - virtual void UnregisterObserver(); + void RegisterObserver(DataChannelObserver* observer) override; + void UnregisterObserver() override; - virtual std::string label() const { return label_; } - virtual bool reliable() const; - virtual bool ordered() const { return config_.ordered; } + std::string label() const override { return label_; } + bool reliable() const override; + bool ordered() const override { return config_.ordered; } // Backwards compatible accessors - virtual uint16_t maxRetransmitTime() const { + uint16_t maxRetransmitTime() const override { return config_.maxRetransmitTime ? *config_.maxRetransmitTime : static_cast(-1); } - virtual uint16_t maxRetransmits() const { + uint16_t maxRetransmits() const override { return config_.maxRetransmits ? *config_.maxRetransmits : static_cast(-1); } - virtual absl::optional maxPacketLifeTime() const { + absl::optional maxPacketLifeTime() const override { return config_.maxRetransmitTime; } - virtual absl::optional maxRetransmitsOpt() const { + absl::optional maxRetransmitsOpt() const override { return config_.maxRetransmits; } - virtual std::string protocol() const { return config_.protocol; } - virtual bool negotiated() const { return config_.negotiated; } - virtual int id() const { return config_.id; } - virtual Priority priority() const { + std::string protocol() const override { return config_.protocol; } + bool negotiated() const override { return config_.negotiated; } + int id() const override { return config_.id; } + Priority priority() const override { return config_.priority ? *config_.priority : Priority::kLow; } + virtual int internal_id() const { return internal_id_; } - virtual uint64_t buffered_amount() const; - virtual void Close(); - virtual DataState state() const { return state_; } - virtual RTCError error() const; - virtual uint32_t messages_sent() const { return messages_sent_; } - virtual uint64_t bytes_sent() const { return bytes_sent_; } - virtual uint32_t messages_received() const { return messages_received_; } - virtual uint64_t bytes_received() const { return bytes_received_; } - virtual bool Send(const DataBuffer& buffer); + + uint64_t buffered_amount() const override; + void Close() override; + DataState state() const override; + RTCError error() const override; + uint32_t messages_sent() const override; + uint64_t bytes_sent() const override; + uint32_t messages_received() const override; + uint64_t bytes_received() const override; + bool Send(const DataBuffer& buffer) override; // Close immediately, ignoring any queued data or closing procedure. // This is called for RTP data channels when SDP indicates a channel should @@ -234,8 +238,10 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { DataChannel(const InternalDataChannelInit& config, DataChannelProviderInterface* client, cricket::DataChannelType dct, - const std::string& label); - virtual ~DataChannel(); + const std::string& label, + rtc::Thread* signaling_thread, + rtc::Thread* network_thread); + ~DataChannel() override; private: // A packet queue which tracks the total queued bytes. Queued packets are @@ -284,36 +290,38 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { void QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer); bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer); + rtc::Thread* const signaling_thread_; + rtc::Thread* const network_thread_; const int internal_id_; const std::string label_; const InternalDataChannelInit config_; - DataChannelObserver* observer_; - DataState state_; - RTCError error_; - uint32_t messages_sent_; - uint64_t bytes_sent_; - uint32_t messages_received_; - uint64_t bytes_received_; + DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_); + DataState state_ RTC_GUARDED_BY(signaling_thread_); + RTCError error_ RTC_GUARDED_BY(signaling_thread_); + uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_); + uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_); + uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_); + uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_); // Number of bytes of data that have been queued using Send(). Increased // before each transport send and decreased after each successful send. - uint64_t buffered_amount_; + uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_); const cricket::DataChannelType data_channel_type_; - DataChannelProviderInterface* provider_; - HandshakeState handshake_state_; - bool connected_to_provider_; - bool send_ssrc_set_; - bool receive_ssrc_set_; - bool writable_; + DataChannelProviderInterface* const provider_; + HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_); + bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_); + bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_); + bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_); + bool writable_ RTC_GUARDED_BY(signaling_thread_); // Did we already start the graceful SCTP closing procedure? - bool started_closing_procedure_ = false; - uint32_t send_ssrc_; - uint32_t receive_ssrc_; + bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false; + uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_); + uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_); // Control messages that always have to get sent out before any queued // data. - PacketQueue queued_control_data_; - PacketQueue queued_received_data_; - PacketQueue queued_send_data_; - rtc::AsyncInvoker invoker_; + PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_); + PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_); + PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_); + rtc::AsyncInvoker invoker_ RTC_GUARDED_BY(signaling_thread_); }; // Define proxy for DataChannelInterface. @@ -341,6 +349,7 @@ PROXY_CONSTMETHOD0(uint32_t, messages_received) PROXY_CONSTMETHOD0(uint64_t, bytes_received) PROXY_CONSTMETHOD0(uint64_t, buffered_amount) PROXY_METHOD0(void, Close) +// TODO(bugs.webrtc.org/11547): Change to run on the network thread. PROXY_METHOD1(bool, Send, const DataBuffer&) END_PROXY_MAP() diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc index e9ea742c44..9891d5025f 100644 --- a/pc/data_channel_controller.cc +++ b/pc/data_channel_controller.cc @@ -25,37 +25,10 @@ bool DataChannelController::HasDataChannels() const { bool DataChannelController::SendData(const cricket::SendDataParams& params, const rtc::CopyOnWriteBuffer& payload, cricket::SendDataResult* result) { - // RTC_DCHECK_RUN_ON(signaling_thread()); - if (data_channel_transport()) { - SendDataParams send_params; - send_params.type = ToWebrtcDataMessageType(params.type); - send_params.ordered = params.ordered; - if (params.max_rtx_count >= 0) { - send_params.max_rtx_count = params.max_rtx_count; - } else if (params.max_rtx_ms >= 0) { - send_params.max_rtx_ms = params.max_rtx_ms; - } - - RTCError error = network_thread()->Invoke( - RTC_FROM_HERE, [this, params, send_params, payload] { - return data_channel_transport()->SendData(params.sid, send_params, - payload); - }); - - if (error.ok()) { - *result = cricket::SendDataResult::SDR_SUCCESS; - return true; - } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { - // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. - // TODO(mellem): Stop using RTCError here and get rid of the mapping. - *result = cricket::SendDataResult::SDR_BLOCK; - return false; - } - *result = cricket::SendDataResult::SDR_ERROR; - return false; - } else if (rtp_data_channel()) { + if (data_channel_transport()) + return DataChannelSendData(params, payload, result); + if (rtp_data_channel()) return rtp_data_channel()->SendData(params, payload, result); - } RTC_LOG(LS_ERROR) << "SendData called before transport is ready"; return false; } @@ -146,6 +119,14 @@ void DataChannelController::OnDataReceived( data_channel_transport_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), [this, params, buffer] { RTC_DCHECK_RUN_ON(signaling_thread()); + // TODO(bugs.webrtc.org/11547): The data being received should be + // delivered on the network thread. The way HandleOpenMessage_s works + // right now is that it's called for all types of buffers and operates + // as a selector function. Change this so that it's only called for + // buffers that it should be able to handle. Once we do that, we can + // deliver all other buffers on the network thread (change + // SignalDataChannelTransportReceivedData_s to + // SignalDataChannelTransportReceivedData_n). if (!HandleOpenMessage_s(params, buffer)) { SignalDataChannelTransportReceivedData_s(params, buffer); } @@ -261,6 +242,7 @@ void DataChannelController::OnDataChannelOpenMessage( return; } + // TODO(bugs.webrtc.org/11547): Inject the network thread as well. rtc::scoped_refptr proxy_channel = DataChannelProxy::Create(signaling_thread(), channel); pc_->Observer()->OnDataChannel(std::move(proxy_channel)); @@ -299,7 +281,8 @@ DataChannelController::InternalCreateDataChannel( } rtc::scoped_refptr channel( - DataChannel::Create(this, data_channel_type(), label, new_config)); + DataChannel::Create(this, data_channel_type(), label, new_config, + signaling_thread(), network_thread())); if (!channel) { sid_allocator_.ReleaseSid(new_config.id); return nullptr; @@ -424,9 +407,10 @@ void DataChannelController::UpdateLocalRtpDataChannels( void DataChannelController::UpdateRemoteRtpDataChannels( const cricket::StreamParamsVec& streams) { + RTC_DCHECK_RUN_ON(signaling_thread()); + std::vector existing_channels; - RTC_DCHECK_RUN_ON(signaling_thread()); // Find new and active data channels. for (const cricket::StreamParams& params : streams) { // The data channel label is either the mslabel or the SSRC if the mslabel @@ -447,6 +431,44 @@ void DataChannelController::UpdateRemoteRtpDataChannels( UpdateClosingRtpDataChannels(existing_channels, false); } +cricket::DataChannelType DataChannelController::data_channel_type() const { + // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread. + // RTC_DCHECK_RUN_ON(signaling_thread()); + return data_channel_type_; +} + +void DataChannelController::set_data_channel_type( + cricket::DataChannelType type) { + RTC_DCHECK_RUN_ON(signaling_thread()); + data_channel_type_ = type; +} + +DataChannelTransportInterface* DataChannelController::data_channel_transport() + const { + // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the + // network thread. + // RTC_DCHECK_RUN_ON(network_thread()); + return data_channel_transport_; +} + +void DataChannelController::set_data_channel_transport( + DataChannelTransportInterface* transport) { + RTC_DCHECK_RUN_ON(network_thread()); + data_channel_transport_ = transport; +} + +const std::map>* +DataChannelController::rtp_data_channels() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return &rtp_data_channels_; +} + +const std::vector>* +DataChannelController::sctp_data_channels() const { + RTC_DCHECK_RUN_ON(signaling_thread()); + return &sctp_data_channels_; +} + void DataChannelController::UpdateClosingRtpDataChannels( const std::vector& active_channels, bool is_local_update) { @@ -483,11 +505,50 @@ void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label, return; } channel->SetReceiveSsrc(remote_ssrc); + // TODO(bugs.webrtc.org/11547): Inject the network thread as well. rtc::scoped_refptr proxy_channel = DataChannelProxy::Create(signaling_thread(), channel); pc_->Observer()->OnDataChannel(std::move(proxy_channel)); } +bool DataChannelController::DataChannelSendData( + const cricket::SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload, + cricket::SendDataResult* result) { + // TODO(bugs.webrtc.org/11547): Expect method to be called on the network + // thread instead. Remove the Invoke() below and move assocated state to + // the network thread. + RTC_DCHECK_RUN_ON(signaling_thread()); + RTC_DCHECK(data_channel_transport()); + + SendDataParams send_params; + send_params.type = ToWebrtcDataMessageType(params.type); + send_params.ordered = params.ordered; + if (params.max_rtx_count >= 0) { + send_params.max_rtx_count = params.max_rtx_count; + } else if (params.max_rtx_ms >= 0) { + send_params.max_rtx_ms = params.max_rtx_ms; + } + + RTCError error = network_thread()->Invoke( + RTC_FROM_HERE, [this, params, send_params, payload] { + return data_channel_transport()->SendData(params.sid, send_params, + payload); + }); + + if (error.ok()) { + *result = cricket::SendDataResult::SDR_SUCCESS; + return true; + } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) { + // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked. + // TODO(mellem): Stop using RTCError here and get rid of the mapping. + *result = cricket::SendDataResult::SDR_BLOCK; + return false; + } + *result = cricket::SendDataResult::SDR_ERROR; + return false; +} + rtc::Thread* DataChannelController::network_thread() const { return pc_->network_thread(); } diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h index 60bcbb32a8..156bbe557b 100644 --- a/pc/data_channel_controller.h +++ b/pc/data_channel_controller.h @@ -89,34 +89,20 @@ class DataChannelController : public DataChannelProviderInterface, void UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec& streams); // Accessors - cricket::DataChannelType data_channel_type() const { - return data_channel_type_; - } - void set_data_channel_type(cricket::DataChannelType type) { - data_channel_type_ = type; - } + cricket::DataChannelType data_channel_type() const; + void set_data_channel_type(cricket::DataChannelType type); cricket::RtpDataChannel* rtp_data_channel() const { return rtp_data_channel_; } void set_rtp_data_channel(cricket::RtpDataChannel* channel) { rtp_data_channel_ = channel; } - DataChannelTransportInterface* data_channel_transport() const { - return data_channel_transport_; - } - void set_data_channel_transport(DataChannelTransportInterface* transport) { - data_channel_transport_ = transport; - } + DataChannelTransportInterface* data_channel_transport() const; + void set_data_channel_transport(DataChannelTransportInterface* transport); const std::map>* - rtp_data_channels() const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return &rtp_data_channels_; - } + rtp_data_channels() const; const std::vector>* sctp_data_channels() - const { - RTC_DCHECK_RUN_ON(signaling_thread()); - return &sctp_data_channels_; - } + const; sigslot::signal1& SignalDataChannelCreated() { RTC_DCHECK_RUN_ON(signaling_thread()); @@ -146,6 +132,11 @@ class DataChannelController : public DataChannelProviderInterface, const std::vector& active_channels, bool is_local_update) RTC_RUN_ON(signaling_thread()); + // Called from SendData when data_channel_transport() is true. + bool DataChannelSendData(const cricket::SendDataParams& params, + const rtc::CopyOnWriteBuffer& payload, + cricket::SendDataResult* result); + rtc::Thread* network_thread() const; rtc::Thread* signaling_thread() const; @@ -189,6 +180,8 @@ class DataChannelController : public DataChannelProviderInterface, // Signals from |data_channel_transport_|. These are invoked on the // signaling thread. + // TODO(bugs.webrtc.org/11547): These '_s' signals likely all belong on the + // network thread. sigslot::signal1 SignalDataChannelTransportWritable_s RTC_GUARDED_BY(signaling_thread()); sigslot::signal2set_transport_available(true); @@ -111,7 +114,8 @@ class StateSignalsListener : public sigslot::has_slots<> { TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) { provider_->set_transport_available(true); rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_TRUE(provider_->IsConnected(dc.get())); // The sid is not set yet, so it should not have added the streams. @@ -305,7 +309,8 @@ TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) { webrtc::InternalDataChannelInit init; init.id = 1; rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state()); EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), 1000); } @@ -318,7 +323,8 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceivesOpenAck) { init.id = 1; init.ordered = false; rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); @@ -348,7 +354,8 @@ TEST_F(SctpDataChannelTest, SendUnorderedAfterReceiveData) { init.id = 1; init.ordered = false; rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); @@ -449,7 +456,8 @@ TEST_F(SctpDataChannelTest, NoMsgSentIfNegotiatedAndNotFromOpenMsg) { SetChannelReady(); rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); EXPECT_EQ(0U, provider_->last_send_data_params().ssrc); @@ -512,7 +520,8 @@ TEST_F(SctpDataChannelTest, OpenAckSentIfCreatedFromOpenMessage) { SetChannelReady(); rtc::scoped_refptr dc = - DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config); + DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config, + rtc::Thread::Current(), rtc::Thread::Current()); EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000); diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index e581ac0534..76f87f270e 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc @@ -2211,6 +2211,7 @@ rtc::scoped_refptr PeerConnection::CreateDataChannel( UpdateNegotiationNeeded(); } NoteUsageEvent(UsageEvent::DATA_ADDED); + // TODO(bugs.webrtc.org/11547): Inject the network thread as well. return DataChannelProxy::Create(signaling_thread(), channel.get()); } @@ -6714,6 +6715,8 @@ bool PeerConnection::CreateDataChannel(const std::string& mid) { case cricket::DCT_RTP: default: RtpTransportInternal* rtp_transport = GetRtpTransport(mid); + // TODO(bugs.webrtc.org/9987): set_rtp_data_channel() should be called on + // the network thread like set_data_channel_transport is. data_channel_controller_.set_rtp_data_channel( channel_manager()->CreateRtpDataChannel( configuration_.media_config, rtp_transport, signaling_thread(), diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc index e0965af56e..013965c486 100644 --- a/pc/rtc_stats_collector_unittest.cc +++ b/pc/rtc_stats_collector_unittest.cc @@ -1398,11 +1398,14 @@ TEST_F(RTCStatsCollectorTest, CollectRTCPeerConnectionStats) { report->Get("RTCPeerConnection")->cast_to()); } + // TODO(bugs.webrtc.org/11547): Supply a separate network thread. rtc::scoped_refptr dummy_channel_a = DataChannel::Create( - nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit()); + nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit(), + rtc::Thread::Current(), rtc::Thread::Current()); pc_->SignalDataChannelCreated()(dummy_channel_a.get()); rtc::scoped_refptr dummy_channel_b = DataChannel::Create( - nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit()); + nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit(), + rtc::Thread::Current(), rtc::Thread::Current()); pc_->SignalDataChannelCreated()(dummy_channel_b.get()); dummy_channel_a->SignalOpened(dummy_channel_a.get()); diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h index c6391583f5..f459552170 100644 --- a/pc/test/fake_peer_connection_for_stats.h +++ b/pc/test/fake_peer_connection_for_stats.h @@ -174,8 +174,10 @@ class FakePeerConnectionForStats : public FakePeerConnectionBase { void AddSctpDataChannel(const std::string& label, const InternalDataChannelInit& init) { - AddSctpDataChannel(DataChannel::Create(&data_channel_provider_, - cricket::DCT_SCTP, label, init)); + // TODO(bugs.webrtc.org/11547): Supply a separate network thread. + AddSctpDataChannel(DataChannel::Create( + &data_channel_provider_, cricket::DCT_SCTP, label, init, + rtc::Thread::Current(), rtc::Thread::Current())); } void AddSctpDataChannel(rtc::scoped_refptr data_channel) { diff --git a/pc/test/mock_data_channel.h b/pc/test/mock_data_channel.h index 63f0e6ce64..bc5f94da5f 100644 --- a/pc/test/mock_data_channel.h +++ b/pc/test/mock_data_channel.h @@ -31,11 +31,15 @@ class MockDataChannel : public rtc::RefCountedObject { uint64_t bytes_sent, uint32_t messages_received, uint64_t bytes_received, - const InternalDataChannelInit& config = InternalDataChannelInit()) + const InternalDataChannelInit& config = InternalDataChannelInit(), + rtc::Thread* signaling_thread = rtc::Thread::Current(), + rtc::Thread* network_thread = rtc::Thread::Current()) : rtc::RefCountedObject(config, nullptr, cricket::DCT_NONE, - label) { + label, + signaling_thread, + network_thread) { EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id)); EXPECT_CALL(*this, state()).WillRepeatedly(::testing::Return(state)); EXPECT_CALL(*this, protocol()).WillRepeatedly(::testing::Return(protocol));