diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn index 98680f62d2..d9ff347397 100644 --- a/p2p/BUILD.gn +++ b/p2p/BUILD.gn @@ -99,6 +99,7 @@ rtc_library("rtc_p2p") { "../rtc_base:checks", "../rtc_base:rtc_numerics", "../rtc_base/experiments:field_trial_parser", + "../rtc_base/synchronization:sequence_checker", # Needed by pseudo_tcp, which should move to a separate target. "../rtc_base:safe_minmax", diff --git a/p2p/base/dtls_transport.cc b/p2p/base/dtls_transport.cc index 1b7a66000d..52fe5c65a2 100644 --- a/p2p/base/dtls_transport.cc +++ b/p2p/base/dtls_transport.cc @@ -73,6 +73,8 @@ rtc::StreamResult StreamInterfaceChannel::Read(void* buffer, size_t buffer_len, size_t* read, int* error) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (state_ == rtc::SS_CLOSED) return rtc::SR_EOS; if (state_ == rtc::SS_OPENING) @@ -89,6 +91,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data, size_t data_len, size_t* written, int* error) { + RTC_DCHECK_RUN_ON(&sequence_checker_); // Always succeeds, since this is an unreliable transport anyway. // TODO(zhihuang): Should this block if ice_transport_'s temporarily // unwritable? @@ -102,6 +105,7 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data, } bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) { + RTC_DCHECK_RUN_ON(&sequence_checker_); if (packets_.size() > 0) { RTC_LOG(LS_WARNING) << "Packet already in queue."; } @@ -118,10 +122,12 @@ bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) { } rtc::StreamState StreamInterfaceChannel::GetState() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); return state_; } void StreamInterfaceChannel::Close() { + RTC_DCHECK_RUN_ON(&sequence_checker_); packets_.Clear(); state_ = rtc::SS_CLOSED; } diff --git a/p2p/base/dtls_transport.h b/p2p/base/dtls_transport.h index 89156a15d1..430c912330 100644 --- a/p2p/base/dtls_transport.h +++ b/p2p/base/dtls_transport.h @@ -24,6 +24,7 @@ #include "rtc_base/ssl_stream_adapter.h" #include "rtc_base/stream.h" #include "rtc_base/strings/string_builder.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/thread_checker.h" namespace rtc { @@ -54,9 +55,10 @@ class StreamInterfaceChannel : public rtc::StreamInterface { int* error) override; private: - IceTransportInternal* ice_transport_; // owned by DtlsTransport - rtc::StreamState state_; - rtc::BufferQueue packets_; + webrtc::SequenceChecker sequence_checker_; + IceTransportInternal* const ice_transport_; // owned by DtlsTransport + rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_); + rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_); RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterfaceChannel); }; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index e15206c75a..a09c06ed75 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -151,6 +151,7 @@ rtc_library("rtc_base_approved") { ":stringutils", ":thread_checker", ":timeutils", + "synchronization:sequence_checker", ] } diff --git a/rtc_base/buffer_queue.cc b/rtc_base/buffer_queue.cc index adad9dda17..7879e933c7 100644 --- a/rtc_base/buffer_queue.cc +++ b/rtc_base/buffer_queue.cc @@ -21,23 +21,20 @@ BufferQueue::BufferQueue(size_t capacity, size_t default_size) : capacity_(capacity), default_size_(default_size) {} BufferQueue::~BufferQueue() { - webrtc::MutexLock lock(&mutex_); - - for (Buffer* buffer : queue_) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + for (Buffer* buffer : queue_) delete buffer; - } - for (Buffer* buffer : free_list_) { + for (Buffer* buffer : free_list_) delete buffer; - } } size_t BufferQueue::size() const { - webrtc::MutexLock lock(&mutex_); + RTC_DCHECK_RUN_ON(&sequence_checker_); return queue_.size(); } void BufferQueue::Clear() { - webrtc::MutexLock lock(&mutex_); + RTC_DCHECK_RUN_ON(&sequence_checker_); while (!queue_.empty()) { free_list_.push_back(queue_.front()); queue_.pop_front(); @@ -45,36 +42,30 @@ void BufferQueue::Clear() { } bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) { - webrtc::MutexLock lock(&mutex_); - if (queue_.empty()) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (queue_.empty()) return false; - } - bool was_writable = queue_.size() < capacity_; Buffer* packet = queue_.front(); queue_.pop_front(); bytes = std::min(bytes, packet->size()); memcpy(buffer, packet->data(), bytes); - if (bytes_read) { + + if (bytes_read) *bytes_read = bytes; - } + free_list_.push_back(packet); - if (!was_writable) { - NotifyWritableForTest(); - } return true; } bool BufferQueue::WriteBack(const void* buffer, size_t bytes, size_t* bytes_written) { - webrtc::MutexLock lock(&mutex_); - if (queue_.size() == capacity_) { + RTC_DCHECK_RUN_ON(&sequence_checker_); + if (queue_.size() == capacity_) return false; - } - bool was_readable = !queue_.empty(); Buffer* packet; if (!free_list_.empty()) { packet = free_list_.back(); @@ -84,13 +75,10 @@ bool BufferQueue::WriteBack(const void* buffer, } packet->SetData(static_cast(buffer), bytes); - if (bytes_written) { + if (bytes_written) *bytes_written = bytes; - } + queue_.push_back(packet); - if (!was_readable) { - NotifyReadableForTest(); - } return true; } diff --git a/rtc_base/buffer_queue.h b/rtc_base/buffer_queue.h index 29d1a5b136..24a9b04dc2 100644 --- a/rtc_base/buffer_queue.h +++ b/rtc_base/buffer_queue.h @@ -18,16 +18,16 @@ #include "rtc_base/buffer.h" #include "rtc_base/constructor_magic.h" -#include "rtc_base/synchronization/mutex.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/thread_annotations.h" namespace rtc { -class BufferQueue { +class BufferQueue final { public: // Creates a buffer queue with a given capacity and default buffer size. BufferQueue(size_t capacity, size_t default_size); - virtual ~BufferQueue(); + ~BufferQueue(); // Return number of queued buffers. size_t size() const; @@ -44,17 +44,22 @@ class BufferQueue { // Returns true unless no data could be written. bool WriteBack(const void* data, size_t bytes, size_t* bytes_written); - protected: - // These methods are called when the state of the queue changes. - virtual void NotifyReadableForTest() {} - virtual void NotifyWritableForTest() {} + bool is_writable() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + return queue_.size() < capacity_; + } + + bool is_readable() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); + return !queue_.empty(); + } private: - size_t capacity_; - size_t default_size_; - mutable webrtc::Mutex mutex_; - std::deque queue_ RTC_GUARDED_BY(mutex_); - std::vector free_list_ RTC_GUARDED_BY(mutex_); + webrtc::SequenceChecker sequence_checker_; + const size_t capacity_; + const size_t default_size_; + std::deque queue_ RTC_GUARDED_BY(sequence_checker_); + std::vector free_list_ RTC_GUARDED_BY(sequence_checker_); RTC_DISALLOW_COPY_AND_ASSIGN(BufferQueue); }; diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc index c1017b773d..bfbaf0f301 100644 --- a/rtc_base/ssl_stream_adapter_unittest.cc +++ b/rtc_base/ssl_stream_adapter_unittest.cc @@ -231,10 +231,10 @@ class SSLDummyStreamTLS : public SSLDummyStreamBase { : SSLDummyStreamBase(test, side, in, out) {} }; -class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface { +class BufferQueueStream : public rtc::StreamInterface { public: BufferQueueStream(size_t capacity, size_t default_size) - : rtc::BufferQueue(capacity, default_size) {} + : buffer_(capacity, default_size) {} // Implementation of abstract StreamInterface methods. @@ -246,9 +246,13 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface { size_t buffer_len, size_t* read, int* error) override { - if (!ReadFront(buffer, buffer_len, read)) { + const bool was_writable = buffer_.is_writable(); + if (!buffer_.ReadFront(buffer, buffer_len, read)) return rtc::SR_BLOCK; - } + + if (!was_writable) + NotifyWritableForTest(); + return rtc::SR_SUCCESS; } @@ -257,9 +261,13 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface { size_t data_len, size_t* written, int* error) override { - if (!WriteBack(data, data_len, written)) { + const bool was_readable = buffer_.is_readable(); + if (!buffer_.WriteBack(data, data_len, written)) return rtc::SR_BLOCK; - } + + if (!was_readable) + NotifyReadableForTest(); + return rtc::SR_SUCCESS; } @@ -267,9 +275,12 @@ class BufferQueueStream : public rtc::BufferQueue, public rtc::StreamInterface { void Close() override {} protected: - void NotifyReadableForTest() override { PostEvent(rtc::SE_READ, 0); } + void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); } - void NotifyWritableForTest() override { PostEvent(rtc::SE_WRITE, 0); } + void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); } + + private: + rtc::BufferQueue buffer_; }; class SSLDummyStreamDTLS : public SSLDummyStreamBase {