From f2c2f8f20cdc2e77ca69b36b9daa00c4de220022 Mon Sep 17 00:00:00 2001 From: zhihuang Date: Wed, 13 Jul 2016 14:13:49 -0700 Subject: [PATCH] Refactoring on QUIC related classes. Merge with the latest webrtc native code. Remove deprecated function Connect() in QuicTransportChannel. Fix the compiling issue and broken unit tests by adding the network thread to QUIC related classes. Review-Url: https://codereview.webrtc.org/2089553002 Cr-Commit-Position: refs/heads/master@{#13472} --- webrtc/api/quicdatachannel.cc | 18 ++++++++++-------- webrtc/api/quicdatachannel.h | 11 ++++++++--- webrtc/api/quicdatachannel_unittest.cc | 7 +++---- webrtc/api/quicdatatransport.cc | 12 ++++++++---- webrtc/api/quicdatatransport.h | 7 +++++-- webrtc/api/quicdatatransport_unittest.cc | 6 +++--- webrtc/p2p/quic/quicsession_unittest.cc | 2 -- webrtc/p2p/quic/quictransportchannel.cc | 3 ++- webrtc/p2p/quic/quictransportchannel.h | 3 --- .../p2p/quic/quictransportchannel_unittest.cc | 4 ---- webrtc/p2p/quic/reliablequicstream_unittest.cc | 8 ++++---- 11 files changed, 43 insertions(+), 38 deletions(-) diff --git a/webrtc/api/quicdatachannel.cc b/webrtc/api/quicdatachannel.cc index f4f573276c..5493382e1a 100644 --- a/webrtc/api/quicdatachannel.cc +++ b/webrtc/api/quicdatachannel.cc @@ -61,10 +61,12 @@ bool ParseQuicDataMessageHeader(const char* data, QuicDataChannel::QuicDataChannel(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, + rtc::Thread* network_thread, const std::string& label, const DataChannelInit& config) : signaling_thread_(signaling_thread), worker_thread_(worker_thread), + network_thread_(network_thread), id_(config.id), state_(kConnecting), buffered_amount_(0), @@ -91,12 +93,12 @@ bool QuicDataChannel::Send(const DataBuffer& buffer) { << " is not open so cannot send."; return false; } - return worker_thread_->Invoke( - RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_w, this, buffer)); + return network_thread_->Invoke( + RTC_FROM_HERE, rtc::Bind(&QuicDataChannel::Send_n, this, buffer)); } -bool QuicDataChannel::Send_w(const DataBuffer& buffer) { - RTC_DCHECK(worker_thread_->IsCurrent()); +bool QuicDataChannel::Send_n(const DataBuffer& buffer) { + RTC_DCHECK(network_thread_->IsCurrent()); // Encode and send the header containing the data channel ID and message ID. rtc::CopyOnWriteBuffer header; @@ -256,7 +258,7 @@ DataChannelInterface::DataState QuicDataChannel::SetTransportChannel_w() { } void QuicDataChannel::OnIncomingMessage(Message&& message) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(message.stream); if (!observer_) { LOG(LS_WARNING) << "QUIC data channel " << id_ @@ -295,7 +297,7 @@ void QuicDataChannel::OnIncomingMessage(Message&& message) { void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, const char* data, size_t len) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(data); const auto& kv = incoming_quic_messages_.find(stream_id); if (kv == incoming_quic_messages_.end()) { @@ -325,7 +327,7 @@ void QuicDataChannel::OnDataReceived(net::QuicStreamId stream_id, } void QuicDataChannel::OnReadyToSend(cricket::TransportChannel* channel) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); RTC_DCHECK(channel == quic_transport_channel_); LOG(LS_INFO) << "QuicTransportChannel is ready to send"; invoker_.AsyncInvoke( @@ -342,7 +344,7 @@ void QuicDataChannel::OnWriteBlockedStreamClosed(net::QuicStreamId stream_id, void QuicDataChannel::OnIncomingQueuedStreamClosed(net::QuicStreamId stream_id, int error) { - RTC_DCHECK(worker_thread_->IsCurrent()); + RTC_DCHECK(network_thread_->IsCurrent()); LOG(LS_VERBOSE) << "Incoming queued stream " << stream_id << " is closed."; incoming_quic_messages_.erase(stream_id); } diff --git a/webrtc/api/quicdatachannel.h b/webrtc/api/quicdatachannel.h index a6b987b144..18a10acbdf 100644 --- a/webrtc/api/quicdatachannel.h +++ b/webrtc/api/quicdatachannel.h @@ -88,6 +88,7 @@ class QuicDataChannel : public rtc::RefCountedObject, QuicDataChannel(rtc::Thread* signaling_thread, rtc::Thread* worker_thread, + rtc::Thread* network_thread, const std::string& label, const DataChannelInit& config); ~QuicDataChannel() override; @@ -155,11 +156,13 @@ class QuicDataChannel : public rtc::RefCountedObject, void OnReadyToSend(cricket::TransportChannel* channel); void OnConnectionClosed(); - // Worker thread methods. + // Network thread methods. // Sends the data buffer to the remote peer using an outgoing QUIC stream. // Returns true if the data buffer can be successfully sent, or if it is // queued to be sent later. - bool Send_w(const DataBuffer& buffer); + bool Send_n(const DataBuffer& buffer); + + // Worker thread methods. // Connects the |quic_transport_channel_| signals to this QuicDataChannel, // then returns the new QuicDataChannel state. DataState SetTransportChannel_w(); @@ -185,8 +188,10 @@ class QuicDataChannel : public rtc::RefCountedObject, cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; // Signaling thread for DataChannelInterface methods. rtc::Thread* const signaling_thread_; - // Worker thread for sending data and |quic_transport_channel_| callbacks. + // Worker thread for |quic_transport_channel_| callbacks. rtc::Thread* const worker_thread_; + // Network thread for sending data and |quic_transport_channel_| callbacks. + rtc::Thread* const network_thread_; rtc::AsyncInvoker invoker_; // Map of QUIC stream ID => ReliableQuicStream* for write blocked QUIC // streams. diff --git a/webrtc/api/quicdatachannel_unittest.cc b/webrtc/api/quicdatachannel_unittest.cc index e701c29b4f..7245ccfa21 100644 --- a/webrtc/api/quicdatachannel_unittest.cc +++ b/webrtc/api/quicdatachannel_unittest.cc @@ -120,8 +120,9 @@ class FakeQuicDataTransport : public sigslot::has_slots<> { DataChannelInit config; config.id = id; config.protocol = protocol; - rtc::scoped_refptr data_channel(new QuicDataChannel( - rtc::Thread::Current(), rtc::Thread::Current(), label, config)); + rtc::scoped_refptr data_channel( + new QuicDataChannel(rtc::Thread::Current(), rtc::Thread::Current(), + rtc::Thread::Current(), label, config)); data_channel_by_id_[id] = data_channel; return data_channel; } @@ -201,8 +202,6 @@ class QuicDataChannelPeer { // Connects |ice_transport_channel_| to that of the other peer. void Connect(QuicDataChannelPeer* other_peer) { - ice_transport_channel_->Connect(); - other_peer->ice_transport_channel_->Connect(); ice_transport_channel_->SetDestination(other_peer->ice_transport_channel_); } diff --git a/webrtc/api/quicdatatransport.cc b/webrtc/api/quicdatatransport.cc index 70ad03dbfd..c1caf54067 100644 --- a/webrtc/api/quicdatatransport.cc +++ b/webrtc/api/quicdatatransport.cc @@ -17,10 +17,14 @@ namespace webrtc { QuicDataTransport::QuicDataTransport(rtc::Thread* signaling_thread, - rtc::Thread* worker_thread) - : signaling_thread_(signaling_thread), worker_thread_(worker_thread) { + rtc::Thread* worker_thread, + rtc::Thread* network_thread) + : signaling_thread_(signaling_thread), + worker_thread_(worker_thread), + network_thread_(network_thread) { RTC_DCHECK(signaling_thread_); RTC_DCHECK(worker_thread_); + RTC_DCHECK(network_thread_); } QuicDataTransport::~QuicDataTransport() {} @@ -68,8 +72,8 @@ rtc::scoped_refptr QuicDataTransport::CreateDataChannel( LOG(LS_ERROR) << "QUIC data channel already exists with id " << config->id; return nullptr; } - rtc::scoped_refptr data_channel( - new QuicDataChannel(signaling_thread_, worker_thread_, label, *config)); + rtc::scoped_refptr data_channel(new QuicDataChannel( + signaling_thread_, worker_thread_, network_thread_, label, *config)); if (quic_transport_channel_) { if (!data_channel->SetTransportChannel(quic_transport_channel_)) { LOG(LS_ERROR) diff --git a/webrtc/api/quicdatatransport.h b/webrtc/api/quicdatatransport.h index f0c427d1b5..96fe2a0ad7 100644 --- a/webrtc/api/quicdatatransport.h +++ b/webrtc/api/quicdatatransport.h @@ -36,7 +36,9 @@ namespace webrtc { // exists, it sends the QUIC stream to the QuicDataChannel. class QuicDataTransport : public sigslot::has_slots<> { public: - QuicDataTransport(rtc::Thread* signaling_thread, rtc::Thread* worker_thread); + QuicDataTransport(rtc::Thread* signaling_thread, + rtc::Thread* worker_thread, + rtc::Thread* network_thread); ~QuicDataTransport() override; // Sets the QUIC transport channel for the QuicDataChannels and the @@ -80,9 +82,10 @@ class QuicDataTransport : public sigslot::has_slots<> { quic_stream_by_id_; // QuicTransportChannel for sending/receiving data. cricket::QuicTransportChannel* quic_transport_channel_ = nullptr; - // Signaling and worker threads for the QUIC data channel. + // Threads for the QUIC data channel. rtc::Thread* const signaling_thread_; rtc::Thread* const worker_thread_; + rtc::Thread* const network_thread_; }; } // namespace webrtc diff --git a/webrtc/api/quicdatatransport_unittest.cc b/webrtc/api/quicdatatransport_unittest.cc index d668c55b0b..975898ef1f 100644 --- a/webrtc/api/quicdatatransport_unittest.cc +++ b/webrtc/api/quicdatatransport_unittest.cc @@ -64,7 +64,9 @@ class FakeObserver : public DataChannelObserver { class QuicDataTransportPeer { public: QuicDataTransportPeer() - : quic_data_transport_(rtc::Thread::Current(), rtc::Thread::Current()), + : quic_data_transport_(rtc::Thread::Current(), + rtc::Thread::Current(), + rtc::Thread::Current()), ice_transport_channel_(new FakeTransportChannel("data", 0)), quic_transport_channel_(ice_transport_channel_) { ice_transport_channel_->SetAsync(true); @@ -80,8 +82,6 @@ class QuicDataTransportPeer { // Connects |ice_transport_channel_| to that of the other peer. void Connect(QuicDataTransportPeer* other_peer) { - ice_transport_channel_->Connect(); - other_peer->ice_transport_channel_->Connect(); ice_transport_channel_->SetDestination(other_peer->ice_transport_channel_); } diff --git a/webrtc/p2p/quic/quicsession_unittest.cc b/webrtc/p2p/quic/quicsession_unittest.cc index 2f3aaae332..65996e5c1a 100644 --- a/webrtc/p2p/quic/quicsession_unittest.cc +++ b/webrtc/p2p/quic/quicsession_unittest.cc @@ -295,8 +295,6 @@ void QuicSessionTest::CreateClientAndServerSessions() { channel2->SetAsync(true); // Configure peers to send packets to each other. - channel1->Connect(); - channel2->Connect(); channel1->SetDestination(channel2.get()); client_peer_ = CreateSession(std::move(channel1), Perspective::IS_CLIENT); diff --git a/webrtc/p2p/quic/quictransportchannel.cc b/webrtc/p2p/quic/quictransportchannel.cc index e86fe6a0b2..29819c6269 100644 --- a/webrtc/p2p/quic/quictransportchannel.cc +++ b/webrtc/p2p/quic/quictransportchannel.cc @@ -395,7 +395,8 @@ void QuicTransportChannel::OnRouteChange(TransportChannel* channel, void QuicTransportChannel::OnSelectedCandidatePairChanged( TransportChannel* channel, CandidatePairInterface* selected_candidate_pair, - int last_sent_packet_id bool ready_to_send) { + int last_sent_packet_id, + bool ready_to_send) { ASSERT(channel == channel_.get()); SignalSelectedCandidatePairChanged(this, selected_candidate_pair, last_sent_packet_id, ready_to_send); diff --git a/webrtc/p2p/quic/quictransportchannel.h b/webrtc/p2p/quic/quictransportchannel.h index 22a33eaf0f..1ab13fa0b2 100644 --- a/webrtc/p2p/quic/quictransportchannel.h +++ b/webrtc/p2p/quic/quictransportchannel.h @@ -166,9 +166,6 @@ class QuicTransportChannel : public TransportChannelImpl, void SetIceConfig(const IceConfig& config) override { channel_->SetIceConfig(config); } - void Connect() override { - channel_->Connect(); - } // QuicPacketWriter overrides. // Called from net::QuicConnection when |quic_| has packets to write. diff --git a/webrtc/p2p/quic/quictransportchannel_unittest.cc b/webrtc/p2p/quic/quictransportchannel_unittest.cc index 0e16390a89..49ca29cd14 100644 --- a/webrtc/p2p/quic/quictransportchannel_unittest.cc +++ b/webrtc/p2p/quic/quictransportchannel_unittest.cc @@ -112,8 +112,6 @@ class QuicTestPeer : public sigslot::has_slots<> { // Connects |ice_channel_| to that of the other peer. void Connect(QuicTestPeer* other_peer) { - ice_channel_->Connect(); - other_peer->ice_channel_->Connect(); ice_channel_->SetDestination(other_peer->ice_channel_); } @@ -419,8 +417,6 @@ TEST_F(QuicTransportChannelTest, TransferInvalidSrtp) { // Test that QuicTransportChannel::WritePacket blocks when the ICE // channel is not writable, and otherwise succeeds. TEST_F(QuicTransportChannelTest, QuicWritePacket) { - peer1_.ice_channel()->Connect(); - peer2_.ice_channel()->Connect(); peer1_.ice_channel()->SetDestination(peer2_.ice_channel()); std::string packet = "FAKEQUICPACKET"; diff --git a/webrtc/p2p/quic/reliablequicstream_unittest.cc b/webrtc/p2p/quic/reliablequicstream_unittest.cc index cf9f5e92dd..ff517afb58 100644 --- a/webrtc/p2p/quic/reliablequicstream_unittest.cc +++ b/webrtc/p2p/quic/reliablequicstream_unittest.cc @@ -49,6 +49,7 @@ using rtc::SR_BLOCK; // Arbitrary number for a stream's write blocked priority. static const SpdyPriority kDefaultPriority = 3; +static const net::QuicStreamId kStreamId = 5; // QuicSession that does not create streams and writes data from // ReliableQuicStream to a string. @@ -78,7 +79,7 @@ class MockQuicSession : public QuicSession { net::ReliableQuicStream* CreateIncomingDynamicStream( QuicStreamId id) override { - return nullptr; + return new ReliableQuicStream(kStreamId, this); } net::ReliableQuicStream* CreateOutgoingDynamicStream( @@ -142,7 +143,6 @@ class ReliableQuicStreamTest : public ::testing::Test, ReliableQuicStreamTest() {} void CreateReliableQuicStream() { - const net::QuicStreamId kStreamId = 5; // Arbitrary values for QuicConnection. QuicConnectionHelper* quic_helper = @@ -232,7 +232,7 @@ TEST_F(ReliableQuicStreamTest, BufferData) { // Read an entire string. TEST_F(ReliableQuicStreamTest, ReadDataWhole) { CreateReliableQuicStream(); - net::QuicStreamFrame frame(-1, false, 0, "Hello, World!"); + net::QuicStreamFrame frame(kStreamId, false, 0, "Hello, World!"); stream_->OnStreamFrame(frame); EXPECT_EQ("Hello, World!", read_buffer_); @@ -241,7 +241,7 @@ TEST_F(ReliableQuicStreamTest, ReadDataWhole) { // Read part of a string. TEST_F(ReliableQuicStreamTest, ReadDataPartial) { CreateReliableQuicStream(); - net::QuicStreamFrame frame(-1, false, 0, "Hello, World!"); + net::QuicStreamFrame frame(kStreamId, false, 0, "Hello, World!"); frame.frame_length = 5; stream_->OnStreamFrame(frame);