diff --git a/media/sctp/sctptransport.cc b/media/sctp/sctptransport.cc index b3459858d7..51b4d499cf 100644 --- a/media/sctp/sctptransport.cc +++ b/media/sctp/sctptransport.cc @@ -23,6 +23,7 @@ enum PreservedErrno { #include #include +#include #include #include @@ -73,65 +74,6 @@ enum PayloadProtocolIdentifier { PPID_TEXT_LAST = 51 }; -typedef std::set StreamSet; - -// Returns a comma-separated, human-readable list of the stream IDs in 's' -std::string ListStreams(const StreamSet& s) { - std::stringstream result; - bool first = true; - for (StreamSet::const_iterator it = s.begin(); it != s.end(); ++it) { - if (!first) { - result << ", " << *it; - } else { - result << *it; - first = false; - } - } - return result.str(); -} - -// Returns a pipe-separated, human-readable list of the SCTP_STREAM_RESET -// flags in 'flags' -std::string ListFlags(int flags) { - std::stringstream result; - bool first = true; -// Skip past the first 12 chars (strlen("SCTP_STREAM_")) -#define MAKEFLAG(X) \ - { X, #X + 12 } - struct flaginfo_t { - int value; - const char* name; - } flaginfo[] = {MAKEFLAG(SCTP_STREAM_RESET_INCOMING_SSN), - MAKEFLAG(SCTP_STREAM_RESET_OUTGOING_SSN), - MAKEFLAG(SCTP_STREAM_RESET_DENIED), - MAKEFLAG(SCTP_STREAM_RESET_FAILED), - MAKEFLAG(SCTP_STREAM_CHANGE_DENIED)}; -#undef MAKEFLAG - for (uint32_t i = 0; i < arraysize(flaginfo); ++i) { - if (flags & flaginfo[i].value) { - if (!first) - result << " | "; - result << flaginfo[i].name; - first = false; - } - } - return result.str(); -} - -// Returns a comma-separated, human-readable list of the integers in 'array'. -// All 'num_elems' of them. -std::string ListArray(const uint16_t* array, int num_elems) { - std::stringstream result; - for (int i = 0; i < num_elems; ++i) { - if (i) { - result << ", " << array[i]; - } else { - result << array[i]; - } - } - return result.str(); -} - // Helper for logging SCTP messages. #if defined(__GNUC__) __attribute__((__format__(__printf__, 1, 2))) @@ -472,43 +414,40 @@ bool SctpTransport::OpenStream(int sid) { << "Not adding data stream " << "with sid=" << sid << " because sid is too high."; return false; - } else if (open_streams_.find(sid) != open_streams_.end()) { + } + auto it = stream_status_by_sid_.find(sid); + if (it == stream_status_by_sid_.end()) { + stream_status_by_sid_[sid] = StreamStatus(); + return true; + } + if (it->second.is_open()) { RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): " << "Not adding data stream " << "with sid=" << sid << " because stream is already open."; return false; - } else if (queued_reset_streams_.find(sid) != queued_reset_streams_.end() || - sent_reset_streams_.find(sid) != sent_reset_streams_.end()) { + } else { RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): " << "Not adding data stream " << " with sid=" << sid << " because stream is still closing."; return false; } - - open_streams_.insert(sid); - return true; } bool SctpTransport::ResetStream(int sid) { RTC_DCHECK_RUN_ON(network_thread_); - StreamSet::iterator found = open_streams_.find(sid); - if (found == open_streams_.end()) { - RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid << "): " - << "stream not found."; + + auto it = stream_status_by_sid_.find(sid); + if (it == stream_status_by_sid_.end() || !it->second.is_open()) { + RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid + << "): stream not open."; return false; - } else { - RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): " - << "Removing and queuing RE-CONFIG chunk."; - open_streams_.erase(found); } - // SCTP won't let you have more than one stream reset pending at a time, but - // you can close multiple streams in a single reset. So, we keep an internal - // queue of streams-to-reset, and send them as one reset message in - // SendQueuedStreamResets(). - queued_reset_streams_.insert(sid); + RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): " + << "Queuing RE-CONFIG chunk."; + it->second.closure_initiated = true; // Signal our stream-reset logic that it should try to send now, if it can. SendQueuedStreamResets(); @@ -534,12 +473,15 @@ bool SctpTransport::SendData(const SendDataParams& params, return false; } - if (params.type != DMT_CONTROL && - open_streams_.find(params.sid) == open_streams_.end()) { - RTC_LOG(LS_WARNING) << debug_name_ << "->SendData(...): " - << "Not sending data because sid is unknown: " - << params.sid; - return false; + if (params.type != DMT_CONTROL) { + auto it = stream_status_by_sid_.find(params.sid); + if (it == stream_status_by_sid_.end() || !it->second.is_open()) { + RTC_LOG(LS_WARNING) + << debug_name_ << "->SendData(...): " + << "Not sending data because sid is unknown or closing: " + << params.sid; + return false; + } } // Send data using SCTP. @@ -790,46 +732,63 @@ void SctpTransport::CloseSctpSocket() { bool SctpTransport::SendQueuedStreamResets() { RTC_DCHECK_RUN_ON(network_thread_); - if (!sent_reset_streams_.empty() || queued_reset_streams_.empty()) { + + // Figure out how many streams need to be reset. We need to do this so we can + // allocate the right amount of memory for the sctp_reset_streams structure. + size_t num_streams = std::count_if( + stream_status_by_sid_.begin(), stream_status_by_sid_.end(), + [](const std::map::value_type& stream) { + return stream.second.need_outgoing_reset(); + }); + if (num_streams == 0) { + // Nothing to reset. return true; } RTC_LOG(LS_VERBOSE) << "SendQueuedStreamResets[" << debug_name_ - << "]: Sending [" << ListStreams(queued_reset_streams_) - << "], Open: [" << ListStreams(open_streams_) - << "], Sent: [" << ListStreams(sent_reset_streams_) - << "]"; + << "]: Resetting " << num_streams << " outgoing streams."; - const size_t num_streams = queued_reset_streams_.size(); const size_t num_bytes = sizeof(struct sctp_reset_streams) + (num_streams * sizeof(uint16_t)); - std::vector reset_stream_buf(num_bytes, 0); struct sctp_reset_streams* resetp = reinterpret_cast(&reset_stream_buf[0]); resetp->srs_assoc_id = SCTP_ALL_ASSOC; - resetp->srs_flags = SCTP_STREAM_RESET_INCOMING | SCTP_STREAM_RESET_OUTGOING; + resetp->srs_flags = SCTP_STREAM_RESET_OUTGOING; resetp->srs_number_streams = rtc::checked_cast(num_streams); int result_idx = 0; - for (StreamSet::iterator it = queued_reset_streams_.begin(); - it != queued_reset_streams_.end(); ++it) { - resetp->srs_stream_list[result_idx++] = *it; + + for (const std::map::value_type& stream : + stream_status_by_sid_) { + if (!stream.second.need_outgoing_reset()) { + continue; + } + resetp->srs_stream_list[result_idx++] = stream.first; } int ret = usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp, rtc::checked_cast(reset_stream_buf.size())); if (ret < 0) { - RTC_LOG_ERRNO(LS_ERROR) << debug_name_ - << "->SendQueuedStreamResets(): " - "Failed to send a stream reset for " - << num_streams << " streams"; + // Note that usrsctp only lets us have one reset in progress at a time + // (even though multiple streams can be reset at once). If this happens, + // SendQueuedStreamResets will end up called after the current in-progress + // reset finishes, in OnStreamResetEvent. + RTC_LOG_ERRNO(LS_WARNING) << debug_name_ + << "->SendQueuedStreamResets(): " + "Failed to send a stream reset for " + << num_streams << " streams"; return false; } - // sent_reset_streams_ is empty, and all the queued_reset_streams_ go into - // it now. - queued_reset_streams_.swap(sent_reset_streams_); + // Since the usrsctp call completed successfully, update our stream status + // map to note that we started the outgoing reset. + for (auto it = stream_status_by_sid_.begin(); + it != stream_status_by_sid_.end(); ++it) { + if (it->second.need_outgoing_reset()) { + it->second.outgoing_reset_initiated = true; + } + } return true; } @@ -1049,78 +1008,73 @@ void SctpTransport::OnNotificationAssocChange(const sctp_assoc_change& change) { void SctpTransport::OnStreamResetEvent( const struct sctp_stream_reset_event* evt) { RTC_DCHECK_RUN_ON(network_thread_); - // A stream reset always involves two RE-CONFIG chunks for us -- we always - // simultaneously reset a sid's sequence number in both directions. The - // requesting side transmits a RE-CONFIG chunk and waits for the peer to send - // one back. Both sides get this SCTP_STREAM_RESET_EVENT when they receive - // RE-CONFIGs. + + // This callback indicates that a reset is complete for incoming and/or + // outgoing streams. The reset may have been initiated by us or the remote + // side. const int num_sids = (evt->strreset_length - sizeof(*evt)) / sizeof(evt->strreset_stream_list[0]); - RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ - << "): Flags = 0x" << rtc::ToHex(evt->strreset_flags) - << " (" << ListFlags(evt->strreset_flags) << ")"; - RTC_LOG(LS_VERBOSE) << "Assoc = " << evt->strreset_assoc_id << ", Streams = [" - << ListArray(evt->strreset_stream_list, num_sids) - << "], Open: [" << ListStreams(open_streams_) - << "], Q'd: [" << ListStreams(queued_reset_streams_) - << "], Sent: [" << ListStreams(sent_reset_streams_) - << "]"; - // If both sides try to reset some streams at the same time (even if they're - // disjoint sets), we can get reset failures. if (evt->strreset_flags & SCTP_STREAM_RESET_FAILED) { - // OK, just try again. The stream IDs sent over when the RESET_FAILED flag - // is set seem to be garbage values. Ignore them. - queued_reset_streams_.insert(sent_reset_streams_.begin(), - sent_reset_streams_.end()); - sent_reset_streams_.clear(); + // OK, just try sending any previously sent stream resets again. The stream + // IDs sent over when the RESET_FIALED flag is set seem to be garbage + // values. Ignore them. + for (std::map::value_type& stream : + stream_status_by_sid_) { + stream.second.outgoing_reset_initiated = false; + } + SendQueuedStreamResets(); + // TODO(deadbeef): If this happens, the entire SCTP association is in quite + // crippled state. The SCTP session should be dismantled, and the WebRTC + // connectivity errored because is clear that the distant party is not + // playing ball: malforms the transported data. + return; + } - } else if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { - // Each side gets an event for each direction of a stream. That is, - // closing sid k will make each side receive INCOMING and OUTGOING reset - // events for k. As per RFC6525, Section 5, paragraph 2, each side will - // get an INCOMING event first. - for (int i = 0; i < num_sids; i++) { - const int stream_id = evt->strreset_stream_list[i]; + // Loop over the received events and properly update the StreamStatus map. + for (int i = 0; i < num_sids; i++) { + const uint32_t sid = evt->strreset_stream_list[i]; + auto it = stream_status_by_sid_.find(sid); + if (it == stream_status_by_sid_.end()) { + // This stream is unknown. Sometimes this can be from a + // RESET_FAILED-related retransmit. + RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ + << "): Unknown sid " << sid; + continue; + } + StreamStatus& status = it->second; - // See if this stream ID was closed by our peer or ourselves. - StreamSet::iterator it = sent_reset_streams_.find(stream_id); - - // The reset was requested locally. - if (it != sent_reset_streams_.end()) { - RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ - << "): local sid " << stream_id << " acknowledged."; - sent_reset_streams_.erase(it); - - } else if ((it = open_streams_.find(stream_id)) != open_streams_.end()) { - // The peer requested the reset. - RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ - << "): closing sid " << stream_id; - open_streams_.erase(it); - SignalStreamClosedRemotely(stream_id); - - } else if ((it = queued_reset_streams_.find(stream_id)) != - queued_reset_streams_.end()) { - // The peer requested the reset, but there was a local reset - // queued. - RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ - << "): double-sided close for sid " << stream_id; - // Both sides want the stream closed, and the peer got to send the - // RE-CONFIG first. Treat it like the local Remove(Send|Recv)Stream - // finished quickly. - queued_reset_streams_.erase(it); - - } else { - // This stream is unknown. Sometimes this can be from an - // RESET_FAILED-related retransmit. - RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_ - << "): Unknown sid " << stream_id; + if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) { + RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_INCOMING_SSN(" << debug_name_ + << "): sid " << sid; + status.incoming_reset_complete = true; + // If we receive an incoming stream reset and we haven't started the + // closing procedure ourselves, this means the remote side started the + // closing procedure; fire a signal so that the relevant data channel + // can change to "closing" (we still need to reset the outgoing stream + // before it changes to "closed"). + if (!status.closure_initiated) { + SignalClosingProcedureStartedRemotely(sid); } } + if (evt->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) { + RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_OUTGOING_SSN(" << debug_name_ + << "): sid " << sid; + status.outgoing_reset_complete = true; + } + + // If this reset completes the closing procedure, remove the stream from + // our map so we can consider it closed, and fire a signal such that the + // relevant DataChannel will change its state to "closed" and its ID can be + // re-used. + if (status.reset_complete()) { + stream_status_by_sid_.erase(it); + SignalClosingProcedureComplete(sid); + } } - // Always try to send the queued RESET because this call indicates that the - // last local RESET or remote RESET has made some progress. + // Always try to send any queued resets because this call indicates that the + // last outgoing or incoming reset has made some progress. SendQueuedStreamResets(); } diff --git a/media/sctp/sctptransport.h b/media/sctp/sctptransport.h index 45132e2414..b2d7084d9e 100644 --- a/media/sctp/sctptransport.h +++ b/media/sctp/sctptransport.h @@ -13,6 +13,7 @@ #include +#include #include // for unique_ptr. #include #include @@ -152,17 +153,48 @@ class SctpTransport : public SctpTransportInternal, // congestion control)? Different than |transport_|'s "ready to send". bool ready_to_send_data_ = false; - typedef std::set StreamSet; - // When a data channel opens a stream, it goes into open_streams_. When we - // want to close it, the stream's ID goes into queued_reset_streams_. When - // we actually transmit a RE-CONFIG chunk with that stream ID, the ID goes - // into sent_reset_streams_. When we get a response RE-CONFIG chunk back - // acknowledging the reset, we remove the stream ID from - // sent_reset_streams_. We use sent_reset_streams_ to differentiate - // between acknowledgment RE-CONFIG and peer-initiated RE-CONFIGs. - StreamSet open_streams_; - StreamSet queued_reset_streams_; - StreamSet sent_reset_streams_; + // Used to keep track of the status of each stream (or rather, each pair of + // incoming/outgoing streams with matching IDs). It's specifically used to + // keep track of the status of resets, but more information could be put here + // later. + // + // See datachannel.h for a summary of the closing procedure. + struct StreamStatus { + // Closure initiated by application via ResetStream? Note that + // this may be true while outgoing_reset_initiated is false if the outgoing + // reset needed to be queued. + bool closure_initiated = false; + // Whether we've initiated the outgoing stream reset via + // SCTP_RESET_STREAMS. + bool outgoing_reset_initiated = false; + // Whether usrsctp has indicated that the incoming/outgoing streams have + // been reset. It's expected that the peer will reset its outgoing stream + // (our incoming stream) after receiving the reset for our outgoing stream, + // though older versions of chromium won't do this. See crbug.com/559394 + // for context. + bool outgoing_reset_complete = false; + bool incoming_reset_complete = false; + + // Some helper methods to improve code readability. + bool is_open() const { + return !closure_initiated && !incoming_reset_complete && + !outgoing_reset_complete; + } + // We need to send an outgoing reset if the application has closed the data + // channel, or if we received a reset of the incoming stream from the + // remote endpoint, indicating the data channel was closed remotely. + bool need_outgoing_reset() const { + return (incoming_reset_complete || closure_initiated) && + !outgoing_reset_initiated; + } + bool reset_complete() const { + return outgoing_reset_complete && incoming_reset_complete; + } + }; + + // Entries should only be removed from this map if |reset_complete| is + // true. + std::map stream_status_by_sid_; // A static human-readable name for debugging messages. const char* debug_name_ = "SctpTransport"; diff --git a/media/sctp/sctptransport_unittest.cc b/media/sctp/sctptransport_unittest.cc index 9864d7af0f..86f45531d8 100644 --- a/media/sctp/sctptransport_unittest.cc +++ b/media/sctp/sctptransport_unittest.cc @@ -67,56 +67,54 @@ class SctpFakeDataReceiver : public sigslot::has_slots<> { ReceiveDataParams last_params_; }; -class SignalReadyToSendObserver : public sigslot::has_slots<> { +class SctpTransportObserver : public sigslot::has_slots<> { public: - SignalReadyToSendObserver() : signaled_(false) {} - - void OnSignaled() { signaled_ = true; } - - bool IsSignaled() { return signaled_; } - - private: - bool signaled_; -}; - -class SignalTransportClosedObserver : public sigslot::has_slots<> { - public: - SignalTransportClosedObserver() {} - void BindSelf(SctpTransport* transport) { - transport->SignalStreamClosedRemotely.connect( - this, &SignalTransportClosedObserver::OnStreamClosed); + explicit SctpTransportObserver(SctpTransport* transport) { + transport->SignalClosingProcedureComplete.connect( + this, &SctpTransportObserver::OnClosingProcedureComplete); + transport->SignalReadyToSendData.connect( + this, &SctpTransportObserver::OnReadyToSend); } - void OnStreamClosed(int stream) { streams_.push_back(stream); } int StreamCloseCount(int stream) { - return std::count(streams_.begin(), streams_.end(), stream); + return std::count(closed_streams_.begin(), closed_streams_.end(), stream); } bool WasStreamClosed(int stream) { - return std::find(streams_.begin(), streams_.end(), stream) != - streams_.end(); + return std::find(closed_streams_.begin(), closed_streams_.end(), stream) != + closed_streams_.end(); } + bool ReadyToSend() { return ready_to_send_; } + private: - std::vector streams_; + void OnClosingProcedureComplete(int stream) { + closed_streams_.push_back(stream); + } + void OnReadyToSend() { ready_to_send_ = true; } + + std::vector closed_streams_; + bool ready_to_send_ = false; }; +// Helper class used to immediately attempt to reopen a stream as soon as it's +// been closed. class SignalTransportClosedReopener : public sigslot::has_slots<> { public: SignalTransportClosedReopener(SctpTransport* transport, SctpTransport* peer) : transport_(transport), peer_(peer) {} + int StreamCloseCount(int stream) { + return std::count(streams_.begin(), streams_.end(), stream); + } + + private: void OnStreamClosed(int stream) { transport_->OpenStream(stream); peer_->OpenStream(stream); streams_.push_back(stream); } - int StreamCloseCount(int stream) { - return std::count(streams_.begin(), streams_.end(), stream); - } - - private: SctpTransport* transport_; SctpTransport* peer_; std::vector streams_; @@ -356,14 +354,11 @@ TEST_F(SctpTransportTest, SignalReadyToSendDataAfterDtlsWritable) { FakeDtlsTransport fake_dtls("fake dtls", 0); SctpFakeDataReceiver recv; std::unique_ptr transport(CreateTransport(&fake_dtls, &recv)); - - SignalReadyToSendObserver signal_observer; - transport->SignalReadyToSendData.connect( - &signal_observer, &SignalReadyToSendObserver::OnSignaled); + SctpTransportObserver observer(transport.get()); transport->Start(kSctpDefaultPort, kSctpDefaultPort); fake_dtls.SetWritable(true); - EXPECT_TRUE_WAIT(signal_observer.IsSignaled(), kDefaultTimeout); + EXPECT_TRUE_WAIT(observer.ReadyToSend(), kDefaultTimeout); } // Test that after an SCTP socket's buffer is filled, SignalReadyToSendData @@ -476,10 +471,8 @@ TEST_F(SctpTransportTest, SendDataHighPorts) { TEST_F(SctpTransportTest, ClosesRemoteStream) { SetupConnectedTransportsWithTwoStreams(); - SignalTransportClosedObserver transport1_sig_receiver, - transport2_sig_receiver; - transport1_sig_receiver.BindSelf(transport1()); - transport2_sig_receiver.BindSelf(transport2()); + SctpTransportObserver transport1_observer(transport1()); + SctpTransportObserver transport2_observer(transport2()); SendDataResult result; ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result)); @@ -490,18 +483,16 @@ TEST_F(SctpTransportTest, ClosesRemoteStream) { EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"), kDefaultTimeout); - // Close transport 1. Transport 2 should notify us. + // Close stream 1 on transport 1. Transport 2 should notify us. transport1()->ResetStream(1); - EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout); } TEST_F(SctpTransportTest, ClosesTwoRemoteStreams) { SetupConnectedTransportsWithTwoStreams(); AddStream(3); - SignalTransportClosedObserver transport1_sig_receiver, - transport2_sig_receiver; - transport1_sig_receiver.BindSelf(transport1()); - transport2_sig_receiver.BindSelf(transport2()); + SctpTransportObserver transport1_observer(transport1()); + SctpTransportObserver transport2_observer(transport2()); SendDataResult result; ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result)); @@ -515,18 +506,16 @@ TEST_F(SctpTransportTest, ClosesTwoRemoteStreams) { // Close two streams on one side. transport2()->ResetStream(2); transport2()->ResetStream(3); - EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(2), kDefaultTimeout); - EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(3), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(2), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(3), kDefaultTimeout); } TEST_F(SctpTransportTest, ClosesStreamsOnBothSides) { SetupConnectedTransportsWithTwoStreams(); AddStream(3); AddStream(4); - SignalTransportClosedObserver transport1_sig_receiver, - transport2_sig_receiver; - transport1_sig_receiver.BindSelf(transport1()); - transport2_sig_receiver.BindSelf(transport2()); + SctpTransportObserver transport1_observer(transport1()); + SctpTransportObserver transport2_observer(transport2()); SendDataResult result; ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result)); @@ -545,10 +534,10 @@ TEST_F(SctpTransportTest, ClosesStreamsOnBothSides) { transport2()->ResetStream(2); transport2()->ResetStream(3); transport2()->ResetStream(4); - EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout); - EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(2), kDefaultTimeout); - EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(3), kDefaultTimeout); - EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(4), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(2), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(3), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(4), kDefaultTimeout); } TEST_F(SctpTransportTest, RefusesHighNumberedTransports) { @@ -557,20 +546,18 @@ TEST_F(SctpTransportTest, RefusesHighNumberedTransports) { EXPECT_FALSE(AddStream(kMaxSctpSid + 1)); } -// Flaky, see webrtc:4453. -TEST_F(SctpTransportTest, DISABLED_ReusesAStream) { +TEST_F(SctpTransportTest, ReusesAStream) { // Shut down transport 1, then open it up again for reuse. SetupConnectedTransportsWithTwoStreams(); SendDataResult result; - SignalTransportClosedObserver transport2_sig_receiver; - transport2_sig_receiver.BindSelf(transport2()); + SctpTransportObserver transport2_observer(transport2()); ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result)); EXPECT_EQ(SDR_SUCCESS, result); EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout); transport1()->ResetStream(1); - EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout); + EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout); // Transport 1 is gone now. // Create a new transport 1. @@ -579,8 +566,7 @@ TEST_F(SctpTransportTest, DISABLED_ReusesAStream) { EXPECT_EQ(SDR_SUCCESS, result); EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hi?"), kDefaultTimeout); transport1()->ResetStream(1); - EXPECT_TRUE_WAIT(transport2_sig_receiver.StreamCloseCount(1) == 2, - kDefaultTimeout); + EXPECT_EQ_WAIT(2, transport2_observer.StreamCloseCount(1), kDefaultTimeout); } } // namespace cricket diff --git a/media/sctp/sctptransportinternal.h b/media/sctp/sctptransportinternal.h index 9168320b2b..0380a870b4 100644 --- a/media/sctp/sctptransportinternal.h +++ b/media/sctp/sctptransportinternal.h @@ -85,10 +85,10 @@ class SctpTransportInternal { // used" part. See: // https://bugs.chromium.org/p/chromium/issues/detail?id=619849 virtual bool OpenStream(int sid) = 0; - // The inverse of OpenStream. When this method returns, the reset process may - // have not finished but it will have begun. - // TODO(deadbeef): We need a way to tell when it's done. See: - // https://bugs.chromium.org/p/webrtc/issues/detail?id=4453 + // The inverse of OpenStream. Begins the closing procedure, which will + // eventually result in SignalClosingProcedureComplete on the side that + // initiates it, and both SignalClosingProcedureStartedRemotely and + // SignalClosingProcedureComplete on the other side. virtual bool ResetStream(int sid) = 0; // Send data down this channel (will be wrapped as SCTP packets then given to // usrsctp that will then post the network interface). @@ -111,8 +111,14 @@ class SctpTransportInternal { // contains message payload. sigslot::signal2 SignalDataReceived; - // Parameter is SID of closed stream. - sigslot::signal1 SignalStreamClosedRemotely; + // Parameter is SID; fired when we receive an incoming stream reset on an + // open stream, indicating that the other side started the closing procedure. + // After resetting the outgoing stream, SignalClosingProcedureComplete will + // fire too. + sigslot::signal1 SignalClosingProcedureStartedRemotely; + // Parameter is SID; fired when closing procedure is complete (both incoming + // and outgoing streams reset). + sigslot::signal1 SignalClosingProcedureComplete; // Helper for debugging. virtual void set_debug_name_for_testing(const char* debug_name) = 0; diff --git a/pc/datachannel.cc b/pc/datachannel.cc index 525b6f9508..b7a717c34e 100644 --- a/pc/datachannel.cc +++ b/pc/datachannel.cc @@ -229,6 +229,7 @@ void DataChannel::Close() { send_ssrc_ = 0; send_ssrc_set_ = false; SetState(kClosing); + // Will send queued data before beginning the underlying closing procedure. UpdateState(); } @@ -251,7 +252,9 @@ bool DataChannel::Send(const DataBuffer& buffer) { // blocked. RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); if (!QueueSendDataMessage(buffer)) { - Close(); + RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue " + "additional data."; + CloseAbruptly(); } return true; } @@ -276,11 +279,6 @@ void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) { UpdateState(); } -// The remote peer request that this channel shall be closed. -void DataChannel::RemotePeerRequestClose() { - DoClose(); -} - void DataChannel::SetSctpSid(int sid) { RTC_DCHECK_LT(config_.id, 0); RTC_DCHECK_GE(sid, 0); @@ -293,6 +291,33 @@ void DataChannel::SetSctpSid(int sid) { provider_->AddSctpDataStream(sid); } +void DataChannel::OnClosingProcedureStartedRemotely(int sid) { + if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id && + state_ != kClosing && state_ != kClosed) { + // Don't bother sending queued data since the side that initiated the + // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy + // discussion about this. + queued_send_data_.Clear(); + queued_control_data_.Clear(); + // Just need to change state to kClosing, SctpTransport will handle the + // rest of the closing procedure and OnClosingProcedureComplete will be + // called later. + started_closing_procedure_ = true; + SetState(kClosing); + } +} + +void DataChannel::OnClosingProcedureComplete(int sid) { + if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id) { + // If the closing procedure is complete, we should have finished sending + // all pending data and transitioned to kClosing already. + RTC_DCHECK_EQ(state_, kClosing); + RTC_DCHECK(queued_send_data_.Empty()); + DisconnectFromProvider(); + SetState(kClosed); + } +} + void DataChannel::OnTransportChannelCreated() { RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); if (!connected_to_provider_) { @@ -306,11 +331,15 @@ void DataChannel::OnTransportChannelCreated() { } void DataChannel::OnTransportChannelDestroyed() { - // This method needs to synchronously close the data channel, which means any - // queued data needs to be discarded. - queued_send_data_.Clear(); - queued_control_data_.Clear(); - DoClose(); + // The SctpTransport is going away (for example, because the SCTP m= section + // was rejected), so we need to close abruptly. + CloseAbruptly(); +} + +// The remote peer request that this channel shall be closed. +void DataChannel::RemotePeerRequestClose() { + RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP); + CloseAbruptly(); } void DataChannel::SetSendSsrc(uint32_t send_ssrc) { @@ -387,7 +416,7 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, queued_received_data_.Clear(); if (data_channel_type_ != cricket::DCT_RTP) { - Close(); + CloseAbruptly(); } return; @@ -396,12 +425,6 @@ void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params, } } -void DataChannel::OnStreamClosedRemotely(int sid) { - if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id) { - Close(); - } -} - void DataChannel::OnChannelReady(bool writable) { writable_ = writable; if (!writable) { @@ -413,14 +436,23 @@ void DataChannel::OnChannelReady(bool writable) { UpdateState(); } -void DataChannel::DoClose() { - if (state_ == kClosed) +void DataChannel::CloseAbruptly() { + if (state_ == kClosed) { return; + } - receive_ssrc_set_ = false; - send_ssrc_set_ = false; + if (connected_to_provider_) { + DisconnectFromProvider(); + } + + // Closing abruptly means any queued data gets thrown away. + queued_send_data_.Clear(); + queued_control_data_.Clear(); + + // Still go to "kClosing" before "kClosed", since observers may be expecting + // that. SetState(kClosing); - UpdateState(); + SetState(kClosed); } void DataChannel::UpdateState() { @@ -460,13 +492,28 @@ void DataChannel::UpdateState() { break; } case kClosing: { + // Wait for all queued data to be sent before beginning the closing + // procedure. if (queued_send_data_.Empty() && queued_control_data_.Empty()) { - if (connected_to_provider_) { - DisconnectFromProvider(); - } - - if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) { - SetState(kClosed); + if (data_channel_type_ == cricket::DCT_RTP) { + // For RTP data channels, we can go to "closed" after we finish + // sending data and the send/recv SSRCs are unset. + if (connected_to_provider_) { + DisconnectFromProvider(); + } + if (!send_ssrc_set_ && !receive_ssrc_set_) { + SetState(kClosed); + } + } else { + // For SCTP data channels, we need to wait for the closing procedure + // to complete; after calling RemoveSctpDataStream, + // OnClosingProcedureComplete will end up called asynchronously + // afterwards. + if (connected_to_provider_ && !started_closing_procedure_ && + config_.id >= 0) { + started_closing_procedure_ = true; + provider_->RemoveSctpDataStream(config_.id); + } } } break; @@ -498,10 +545,6 @@ void DataChannel::DisconnectFromProvider() { provider_->DisconnectDataChannel(this); connected_to_provider_ = false; - - if (data_channel_type_ == cricket::DCT_SCTP && config_.id >= 0) { - provider_->RemoveSctpDataStream(config_.id); - } } void DataChannel::DeliverQueuedReceivedData() { @@ -586,7 +629,7 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, " "send_result = " << send_result; - Close(); + CloseAbruptly(); return false; } @@ -653,7 +696,7 @@ bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) { RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send" " the CONTROL message, send_result = " << send_result; - Close(); + CloseAbruptly(); } return retval; } diff --git a/pc/datachannel.h b/pc/datachannel.h index 1af90dab6e..bd7ea1a3d9 100644 --- a/pc/datachannel.h +++ b/pc/datachannel.h @@ -27,6 +27,9 @@ namespace webrtc { class DataChannel; +// TODO(deadbeef): Once RTP data channels go away, get rid of this and have +// DataChannel depend on SctpTransportInternal (pure virtual SctpTransport +// interface) instead. class DataChannelProviderInterface { public: // Sends the data to the transport. @@ -39,7 +42,8 @@ class DataChannelProviderInterface { virtual void DisconnectDataChannel(DataChannel* data_channel) = 0; // Adds the data channel SID to the transport for SCTP. virtual void AddSctpDataStream(int sid) = 0; - // Removes the data channel SID from the transport for SCTP. + // Begins the closing procedure by sending an outgoing stream reset. Still + // need to wait for callbacks to tell when this completes. virtual void RemoveSctpDataStream(int sid) = 0; // Returns true if the transport channel is ready to send data. virtual bool ReadyToSendData() const = 0; @@ -73,7 +77,7 @@ class SctpSidAllocator { // Gets the first unused odd/even id based on the DTLS role. If |role| is // SSL_CLIENT, the allocated id starts from 0 and takes even numbers; // otherwise, the id starts from 1 and takes odd numbers. - // Returns false if no id can be allocated. + // Returns false if no ID can be allocated. bool AllocateSid(rtc::SSLRole role, int* sid); // Attempts to reserve a specific sid. Returns false if it's unavailable. @@ -92,7 +96,7 @@ class SctpSidAllocator { // DataChannel is a an implementation of the DataChannelInterface based on // libjingle's data engine. It provides an implementation of unreliable or // reliabledata channels. Currently this class is specifically designed to use -// both RtpDataEngine and SctpDataEngine. +// both RtpDataChannel and SctpTransport. // DataChannel states: // kConnecting: The channel has been created the transport might not yet be @@ -104,6 +108,16 @@ class SctpSidAllocator { // has been called with SSRC==0 // kClosed: Both UpdateReceiveSsrc and UpdateSendSsrc has been called with // SSRC==0. +// +// How the closing procedure works for SCTP: +// 1. Alice calls Close(), state changes to kClosing. +// 2. Alice finishes sending any queued data. +// 3. Alice calls RemoveSctpDataStream, sends outgoing stream reset. +// 4. Bob receives incoming stream reset; OnClosingProcedureStartedRemotely +// called. +// 5. Bob sends outgoing stream reset. 6. Alice receives incoming reset, +// Bob receives acknowledgement. Both receive OnClosingProcedureComplete +// callback and transition to kClosed. class DataChannel : public DataChannelInterface, public sigslot::has_slots<>, public rtc::MessageHandler { @@ -147,16 +161,21 @@ class DataChannel : public DataChannelInterface, // Slots for provider to connect signals to. void OnDataReceived(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& payload); - void OnStreamClosedRemotely(int sid); - // The remote peer request that this channel should be closed. - void RemotePeerRequestClose(); - - // The following methods are for SCTP only. + /******************************************** + * The following methods are for SCTP only. * + ********************************************/ // Sets the SCTP sid and adds to transport layer if not set yet. Should only // be called once. void SetSctpSid(int sid); + // The remote side started the closing procedure by resetting its outgoing + // stream (our incoming stream). Sets state to kClosing. + void OnClosingProcedureStartedRemotely(int sid); + // The closing procedure is complete; both incoming and outgoing stream + // resets are done and the channel can transition to kClosed. Called + // asynchronously after RemoveSctpDataStream. + void OnClosingProcedureComplete(int sid); // Called when the transport channel is created. // Only needs to be called for SCTP data channels. void OnTransportChannelCreated(); @@ -165,8 +184,12 @@ class DataChannel : public DataChannelInterface, // to kClosed. void OnTransportChannelDestroyed(); - // The following methods are for RTP only. + /******************************************* + * The following methods are for RTP only. * + *******************************************/ + // The remote peer requested that this channel should be closed. + void RemotePeerRequestClose(); // Set the SSRC this channel should use to send data on the // underlying data engine. |send_ssrc| == 0 means that the channel is no // longer part of the session negotiation. @@ -231,7 +254,11 @@ class DataChannel : public DataChannelInterface, }; bool Init(const InternalDataChannelInit& config); - void DoClose(); + // Close immediately, ignoring any queued data or closing procedure. + // This is called for RTP data channels when SDP indicates a channel should + // be removed, or SCTP data channels when the underlying SctpTransport is + // being destroyed. + void CloseAbruptly(); void UpdateState(); void SetState(DataState state); void DisconnectFromProvider(); @@ -261,6 +288,8 @@ class DataChannel : public DataChannelInterface, bool send_ssrc_set_; bool receive_ssrc_set_; bool writable_; + // Did we already start the graceful SCTP closing procedure? + bool started_closing_procedure_ = false; uint32_t send_ssrc_; uint32_t receive_ssrc_; // Control messages that always have to get sent out before any queued diff --git a/pc/datachannel_unittest.cc b/pc/datachannel_unittest.cc index f8e98808d5..4a713cb97b 100644 --- a/pc/datachannel_unittest.cc +++ b/pc/datachannel_unittest.cc @@ -66,6 +66,9 @@ class FakeDataChannelObserver : public webrtc::DataChannelObserver { size_t on_buffered_amount_change_count_; }; +// TODO(deadbeef): The fact that these tests use a fake provider makes them not +// too valuable. Should rewrite using the +// peerconnection_datachannel_unittest.cc infrastructure. class SctpDataChannelTest : public testing::Test { protected: SctpDataChannelTest() @@ -560,29 +563,6 @@ TEST_F(SctpDataChannelTest, ClosedOnTransportError) { webrtc_data_channel_->state()); } -// Tests that a already closed DataChannel does not fire onStateChange again. -TEST_F(SctpDataChannelTest, ClosedDataChannelDoesNotFireOnStateChange) { - AddObserver(); - webrtc_data_channel_->Close(); - // OnStateChange called for kClosing and kClosed. - EXPECT_EQ(2U, observer_->on_state_change_count()); - - observer_->ResetOnStateChangeCount(); - webrtc_data_channel_->RemotePeerRequestClose(); - EXPECT_EQ(0U, observer_->on_state_change_count()); -} - -// Tests that RemotePeerRequestClose closes the local DataChannel. -TEST_F(SctpDataChannelTest, RemotePeerRequestClose) { - AddObserver(); - webrtc_data_channel_->RemotePeerRequestClose(); - - // OnStateChange called for kClosing and kClosed. - EXPECT_EQ(2U, observer_->on_state_change_count()); - EXPECT_EQ(webrtc::DataChannelInterface::kClosed, - webrtc_data_channel_->state()); -} - // Tests that the DataChannel is closed if the received buffer is full. TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) { SetChannelReady(); diff --git a/pc/peerconnection.cc b/pc/peerconnection.cc index 40aced4f95..0a984a703f 100644 --- a/pc/peerconnection.cc +++ b/pc/peerconnection.cc @@ -4479,6 +4479,8 @@ void PeerConnection::OnSctpDataChannelClosed(DataChannel* channel) { ++it) { if (it->get() == channel) { if (channel->id() >= 0) { + // After the closing procedure is done, it's safe to use this ID for + // another data channel. sid_allocator_.ReleaseSid(channel->id()); } // Since this method is triggered by a signal from the DataChannel, @@ -5062,8 +5064,10 @@ bool PeerConnection::ConnectDataChannel(DataChannel* webrtc_data_channel) { &DataChannel::OnChannelReady); SignalSctpDataReceived.connect(webrtc_data_channel, &DataChannel::OnDataReceived); - SignalSctpStreamClosedRemotely.connect( - webrtc_data_channel, &DataChannel::OnStreamClosedRemotely); + SignalSctpClosingProcedureStartedRemotely.connect( + webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely); + SignalSctpClosingProcedureComplete.connect( + webrtc_data_channel, &DataChannel::OnClosingProcedureComplete); } return true; } @@ -5081,7 +5085,8 @@ void PeerConnection::DisconnectDataChannel(DataChannel* webrtc_data_channel) { } else { SignalSctpReadyToSendData.disconnect(webrtc_data_channel); SignalSctpDataReceived.disconnect(webrtc_data_channel); - SignalSctpStreamClosedRemotely.disconnect(webrtc_data_channel); + SignalSctpClosingProcedureStartedRemotely.disconnect(webrtc_data_channel); + SignalSctpClosingProcedureComplete.disconnect(webrtc_data_channel); } } @@ -5601,8 +5606,14 @@ bool PeerConnection::CreateSctpTransport_n(const std::string& mid) { this, &PeerConnection::OnSctpTransportReadyToSendData_n); sctp_transport_->SignalDataReceived.connect( this, &PeerConnection::OnSctpTransportDataReceived_n); - sctp_transport_->SignalStreamClosedRemotely.connect( - this, &PeerConnection::OnSctpStreamClosedRemotely_n); + // TODO(deadbeef): All we do here is AsyncInvoke to fire the signal on + // another thread. Would be nice if there was a helper class similar to + // sigslot::repeater that did this for us, eliminating a bunch of boilerplate + // code. + sctp_transport_->SignalClosingProcedureStartedRemotely.connect( + this, &PeerConnection::OnSctpClosingProcedureStartedRemotely_n); + sctp_transport_->SignalClosingProcedureComplete.connect( + this, &PeerConnection::OnSctpClosingProcedureComplete_n); sctp_mid_ = mid; sctp_transport_->SetDtlsTransport(dtls_transport); return true; @@ -5674,13 +5685,22 @@ void PeerConnection::OnSctpTransportDataReceived_s( } } -void PeerConnection::OnSctpStreamClosedRemotely_n(int sid) { +void PeerConnection::OnSctpClosingProcedureStartedRemotely_n(int sid) { RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); RTC_DCHECK(network_thread()->IsCurrent()); sctp_invoker_->AsyncInvoke( RTC_FROM_HERE, signaling_thread(), rtc::Bind(&sigslot::signal1::operator(), - &SignalSctpStreamClosedRemotely, sid)); + &SignalSctpClosingProcedureStartedRemotely, sid)); +} + +void PeerConnection::OnSctpClosingProcedureComplete_n(int sid) { + RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP); + RTC_DCHECK(network_thread()->IsCurrent()); + sctp_invoker_->AsyncInvoke( + RTC_FROM_HERE, signaling_thread(), + rtc::Bind(&sigslot::signal1::operator(), + &SignalSctpClosingProcedureComplete, sid)); } // Returns false if bundle is enabled and rtcp_mux is disabled. diff --git a/pc/peerconnection.h b/pc/peerconnection.h index 8bcee7a0e1..f753955f1c 100644 --- a/pc/peerconnection.h +++ b/pc/peerconnection.h @@ -811,7 +811,8 @@ class PeerConnection : public PeerConnectionInternal, // CONTROL messages on unused SIDs and processes them as OPEN messages. void OnSctpTransportDataReceived_s(const cricket::ReceiveDataParams& params, const rtc::CopyOnWriteBuffer& payload); - void OnSctpStreamClosedRemotely_n(int sid); + void OnSctpClosingProcedureStartedRemotely_n(int sid); + void OnSctpClosingProcedureComplete_n(int sid); bool ValidateBundleSettings(const cricket::SessionDescription* desc); bool HasRtcpMuxEnabled(const cricket::ContentInfo* content); @@ -993,7 +994,8 @@ class PeerConnection : public PeerConnectionInternal, sigslot::signal2 SignalSctpDataReceived; - sigslot::signal1 SignalSctpStreamClosedRemotely; + sigslot::signal1 SignalSctpClosingProcedureStartedRemotely; + sigslot::signal1 SignalSctpClosingProcedureComplete; std::unique_ptr current_local_description_; std::unique_ptr pending_local_description_; diff --git a/pc/peerconnectionendtoend_unittest.cc b/pc/peerconnectionendtoend_unittest.cc index c57032db4f..2f5631ac0d 100644 --- a/pc/peerconnectionendtoend_unittest.cc +++ b/pc/peerconnectionendtoend_unittest.cc @@ -168,7 +168,7 @@ class PeerConnectionEndToEndBaseTest : public sigslot::has_slots<>, size_t remote_dc_index) { EXPECT_EQ_WAIT(DataChannelInterface::kOpen, local_dc->state(), kMaxWait); - EXPECT_TRUE_WAIT(remote_dc_list.size() > remote_dc_index, kMaxWait); + ASSERT_TRUE_WAIT(remote_dc_list.size() > remote_dc_index, kMaxWait); EXPECT_EQ_WAIT(DataChannelInterface::kOpen, remote_dc_list[remote_dc_index]->state(), kMaxWait); @@ -605,14 +605,10 @@ TEST_P(PeerConnectionEndToEndTest, // Verifies that a DataChannel added from an OPEN message functions after // a channel has been previously closed (webrtc issue 3778). -// This previously failed because the new channel re-uses the ID of the closed -// channel, and the closed channel was incorrectly still assigned to the id. -// TODO(deadbeef): This is disabled because there's currently a race condition -// caused by the fact that a data channel signals that it's closed before it -// really is. Re-enable this test once that's fixed. -// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=4453 +// This previously failed because the new channel re-used the ID of the closed +// channel, and the closed channel was incorrectly still assigned to the ID. TEST_P(PeerConnectionEndToEndTest, - DISABLED_DataChannelFromOpenWorksAfterClose) { + DataChannelFromOpenWorksAfterPreviousChannelClosed) { CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(), webrtc::MockAudioDecoderFactory::CreateEmptyFactory()); @@ -624,12 +620,49 @@ TEST_P(PeerConnectionEndToEndTest, WaitForConnection(); WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 0); - CloseDataChannels(caller_dc, callee_signaled_data_channels_, 0); + int first_channel_id = caller_dc->id(); + // Wait for the local side to say it's closed, but not the remote side. + // Previously, the channel on which Close is called reported being closed + // prematurely, and this caused issues; see bugs.webrtc.org/4453. + caller_dc->Close(); + EXPECT_EQ_WAIT(DataChannelInterface::kClosed, caller_dc->state(), kMaxWait); // Create a new channel and ensure it works after closing the previous one. caller_dc = caller_->CreateDataChannel("data2", init); - WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1); + // Since the second channel was created after the first finished closing, it + // should be able to re-use the first one's ID. + EXPECT_EQ(first_channel_id, caller_dc->id()); + TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1]); + + CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1); +} + +// Similar to the above test, but don't wait for the first channel to finish +// closing before creating the second one. +TEST_P(PeerConnectionEndToEndTest, + DataChannelFromOpenWorksWhilePreviousChannelClosing) { + CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(), + webrtc::MockAudioDecoderFactory::CreateEmptyFactory()); + + webrtc::DataChannelInit init; + rtc::scoped_refptr caller_dc( + caller_->CreateDataChannel("data", init)); + + Negotiate(); + WaitForConnection(); + + WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 0); + int first_channel_id = caller_dc->id(); + caller_dc->Close(); + + // Immediately create a new channel, before waiting for the previous one to + // transition to "closed". + caller_dc = caller_->CreateDataChannel("data2", init); + WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1); + // Since the second channel was created while the first was still closing, + // it should have been assigned a different ID. + EXPECT_NE(first_channel_id, caller_dc->id()); TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1]); CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1); diff --git a/pc/test/fakedatachannelprovider.h b/pc/test/fakedatachannelprovider.h index 9aa0271176..7e68f5fb85 100644 --- a/pc/test/fakedatachannelprovider.h +++ b/pc/test/fakedatachannelprovider.h @@ -75,6 +75,14 @@ class FakeDataChannelProvider : public webrtc::DataChannelProviderInterface { RTC_CHECK(sid >= 0); send_ssrcs_.erase(sid); recv_ssrcs_.erase(sid); + // Unlike the real SCTP transport, act like the closing procedure finished + // instantly, doing the same snapshot thing as below. + for (webrtc::DataChannel* ch : std::set( + connected_channels_.begin(), connected_channels_.end())) { + if (connected_channels_.count(ch)) { + ch->OnClosingProcedureComplete(sid); + } + } } bool ReadyToSendData() const override { return ready_to_send_; } diff --git a/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java b/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java index fa61b3ddff..7e3dee1d31 100644 --- a/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java +++ b/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java @@ -887,6 +887,8 @@ public class PeerConnectionTest { answeringExpectations.expectStateChange(DataChannel.State.CLOSED); answeringExpectations.dataChannel.close(); offeringExpectations.dataChannel.close(); + assertTrue(offeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS)); + assertTrue(answeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS)); // Test SetBitrate. assertTrue(offeringPC.setBitrate(100000, 5000000, 500000000)); @@ -1048,6 +1050,8 @@ public class PeerConnectionTest { answeringExpectations.expectStateChange(DataChannel.State.CLOSED); answeringExpectations.dataChannel.close(); offeringExpectations.dataChannel.close(); + assertTrue(offeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS)); + assertTrue(answeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS)); // Free the Java-land objects and collect them. shutdownPC(offeringPC, offeringExpectations);