diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn index 43bd800316..12db7fbe11 100644 --- a/test/pc/e2e/BUILD.gn +++ b/test/pc/e2e/BUILD.gn @@ -36,7 +36,7 @@ if (!build_with_chromium) { deps = [ ":default_video_quality_analyzer_frames_comparator_test", ":default_video_quality_analyzer_test", - ":multi_head_queue_test", + ":multi_reader_queue_test", ":names_collection_test", ":peer_connection_e2e_smoke_test", ":single_process_encoded_image_data_injector_unittest", @@ -585,11 +585,11 @@ if (!build_with_chromium) { ] } - rtc_library("multi_head_queue_test") { + rtc_library("multi_reader_queue_test") { testonly = true - sources = [ "analyzer/video/multi_head_queue_test.cc" ] + sources = [ "analyzer/video/multi_reader_queue_test.cc" ] deps = [ - ":multi_head_queue", + ":multi_reader_queue", "../../../test:test_support", ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] @@ -692,7 +692,6 @@ if (!build_with_chromium) { deps = [ ":default_video_quality_analyzer_internal", ":default_video_quality_analyzer_shared", - ":multi_head_queue", "../..:perf_test", "../../../api:array_view", "../../../api:video_quality_analyzer_api", @@ -748,7 +747,7 @@ if (!build_with_chromium) { deps = [ ":default_video_quality_analyzer_shared", - ":multi_head_queue", + ":multi_reader_queue", "../../../api:array_view", "../../../api:scoped_refptr", "../../../api/numerics:numerics", @@ -895,10 +894,10 @@ if (!build_with_chromium) { ] } - rtc_library("multi_head_queue") { + rtc_library("multi_reader_queue") { visibility = [ "*" ] testonly = true - sources = [ "analyzer/video/multi_head_queue.h" ] + sources = [ "analyzer/video/multi_reader_queue.h" ] deps = [ "../../../rtc_base:checks" ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h index 6d4818f15d..188588bd5c 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.h @@ -39,7 +39,6 @@ #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h" #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h" #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h" -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" #include "test/pc/e2e/analyzer/video/names_collection.h" #include "test/testsupport/perf_test.h" diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h index c127dd1f67..8cf41a3eaa 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h @@ -15,7 +15,7 @@ #include "absl/types/optional.h" #include "api/units/timestamp.h" -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" +#include "test/pc/e2e/analyzer/video/multi_reader_queue.h" namespace webrtc { @@ -50,7 +50,7 @@ class StreamState { // When new peer is added - all current alive frames will be sent to it as // well. So we need to register them as expected by copying owner_ head to // the new head. - void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); } + void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); } size_t GetAliveFramesCount() const { return frame_ids_.size(GetAliveFramesQueueIndex()); @@ -86,7 +86,7 @@ class StreamState { // If we received frame with id frame_id3, then we will pop frame_id1 and // frame_id2 and consider those frames as dropped and then compare received // frame with the one from `FrameInFlight` with id frame_id3. - MultiHeadQueue frame_ids_; + MultiReaderQueue frame_ids_; std::map last_rendered_frame_time_; }; diff --git a/test/pc/e2e/analyzer/video/multi_head_queue.h b/test/pc/e2e/analyzer/video/multi_head_queue.h deleted file mode 100644 index eef862b8fc..0000000000 --- a/test/pc/e2e/analyzer/video/multi_head_queue.h +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_ -#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_ - -#include -#include -#include - -#include "absl/types/optional.h" -#include "rtc_base/checks.h" - -namespace webrtc { - -// A queue that allows more than one reader. Readers are independent, and all -// readers will see all elements; an inserted element stays in the queue until -// all readers have extracted it. Elements are copied and copying is assumed to -// be cheap. -template -class MultiHeadQueue { - public: - // Creates queue with exactly `readers_count` readers. - explicit MultiHeadQueue(size_t readers_count) { - for (size_t i = 0; i < readers_count; ++i) { - queues_.push_back(std::deque()); - } - } - - // Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()). - // `copy_index` - index of the queue that will be used as a source for - // copying. - void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); } - - // Add value to the end of the queue. Complexity O(readers_count). - void PushBack(T value) { - for (auto& queue : queues_) { - queue.push_back(value); - } - } - - // Extract element from specified head. Complexity O(1). - absl::optional PopFront(size_t index) { - RTC_CHECK_LT(index, queues_.size()); - if (queues_[index].empty()) { - return absl::nullopt; - } - T out = queues_[index].front(); - queues_[index].pop_front(); - return out; - } - - // Returns element at specified head. Complexity O(1). - absl::optional Front(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - if (queues_[index].empty()) { - return absl::nullopt; - } - return queues_[index].front(); - } - - // Returns true if for specified head there are no more elements in the queue - // or false otherwise. Complexity O(1). - bool IsEmpty(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - return queues_[index].empty(); - } - - // Returns size of the longest queue between all readers. - // Complexity O(readers_count). - size_t size() const { - size_t size = 0; - for (auto& queue : queues_) { - if (queue.size() > size) { - size = queue.size(); - } - } - return size; - } - - // Returns size of the specified queue. Complexity O(1). - size_t size(size_t index) const { - RTC_CHECK_LT(index, queues_.size()); - return queues_[index].size(); - } - - size_t readers_count() const { return queues_.size(); } - - private: - std::vector> queues_; -}; - -} // namespace webrtc - -#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_ diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h new file mode 100644 index 0000000000..c8a7db0cec --- /dev/null +++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h @@ -0,0 +1,163 @@ +/* + * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ +#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ + +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "rtc_base/checks.h" + +namespace webrtc { + +// Represents the queue which can be read by multiple readers. Each reader reads +// from its own queue head. When an element is added it will become visible for +// all readers. When an element will be removed by all the readers, the element +// will be removed from the queue. +template +class MultiReaderQueue { + public: + // Creates queue with exactly `readers_count` readers named from 0 to + // `readers_count - 1`. + explicit MultiReaderQueue(size_t readers_count) { + for (size_t i = 0; i < readers_count; ++i) { + heads_[i] = 0; + } + } + // Creates queue with specified readers. + explicit MultiReaderQueue(std::set readers) { + for (size_t reader : readers) { + heads_[reader] = 0; + } + } + + // Adds a new reader, initializing its reading position (the reader's head) + // equal to the one of `reader_to_copy`. New reader will have name index + // equal to the current readers count. + // Complexity O(MultiReaderQueue::size(reader_to_copy)). + void AddReader(size_t reader_to_copy) { + AddReader(heads_.size(), reader_to_copy); + } + + // Adds a new `reader`, initializing its reading position (the reader's head) + // equal to the one of `reader_to_copy`. + // Complexity O(MultiReaderQueue::size(reader_to_copy)). + void AddReader(size_t reader, size_t reader_to_copy) { + size_t pos = GetHeadPositionOrDie(reader_to_copy); + + auto it = heads_.find(reader); + RTC_CHECK(it == heads_.end()) + << "Reader " << reader << " is already in the queue"; + heads_[reader] = heads_[reader_to_copy]; + for (size_t i = pos; i < queue_.size(); ++i) { + in_queues_[i]++; + } + } + + // Removes specified `reader` from the queue. + // Complexity O(MultiReaderQueue::size(reader)). + void RemoveReader(size_t reader) { + size_t pos = GetHeadPositionOrDie(reader); + for (size_t i = pos; i < queue_.size(); ++i) { + in_queues_[i]--; + } + while (!in_queues_.empty() && in_queues_[0] == 0) { + PopFront(); + } + heads_.erase(reader); + } + + // Add value to the end of the queue. Complexity O(1). + void PushBack(T value) { + queue_.push_back(value); + in_queues_.push_back(heads_.size()); + } + + // Extract element from specified head. Complexity O(1). + absl::optional PopFront(size_t reader) { + size_t pos = GetHeadPositionOrDie(reader); + if (pos >= queue_.size()) { + return absl::nullopt; + } + + T out = queue_[pos]; + + in_queues_[pos]--; + heads_[reader]++; + + if (in_queues_[pos] == 0) { + RTC_DCHECK_EQ(pos, 0); + PopFront(); + } + return out; + } + + // Returns element at specified head. Complexity O(1). + absl::optional Front(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + if (pos >= queue_.size()) { + return absl::nullopt; + } + return queue_[pos]; + } + + // Returns true if for specified head there are no more elements in the queue + // or false otherwise. Complexity O(1). + bool IsEmpty(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + return pos >= queue_.size(); + } + + // Returns size of the longest queue between all readers. + // Complexity O(1). + size_t size() const { return queue_.size(); } + + // Returns size of the specified queue. Complexity O(1). + size_t size(size_t reader) const { + size_t pos = GetHeadPositionOrDie(reader); + return queue_.size() - pos; + } + + // Complexity O(1). + size_t readers_count() const { return heads_.size(); } + + private: + size_t GetHeadPositionOrDie(size_t reader) const { + auto it = heads_.find(reader); + RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader; + return it->second - removed_elements_count_; + } + + void PopFront() { + RTC_DCHECK(!queue_.empty()); + RTC_DCHECK_EQ(in_queues_[0], 0); + queue_.pop_front(); + in_queues_.pop_front(); + removed_elements_count_++; + } + + // Number of the elements that were removed from the queue. It is used to + // subtract from each head to compute the right index inside `queue_`; + size_t removed_elements_count_ = 0; + std::deque queue_; + // In how may queues the element at index `i` is. An element can be removed + // from the front if and only if it is in 0 queues. + std::deque in_queues_; + // Map from the reader to the head position in the queue. + std::unordered_map heads_; +}; + +} // namespace webrtc + +#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_ diff --git a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc similarity index 53% rename from test/pc/e2e/analyzer/video/multi_head_queue_test.cc rename to test/pc/e2e/analyzer/video/multi_reader_queue_test.cc index 2aa6fd80e5..e41c4a8208 100644 --- a/test/pc/e2e/analyzer/video/multi_head_queue_test.cc +++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc @@ -8,24 +8,57 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "test/pc/e2e/analyzer/video/multi_head_queue.h" +#include "test/pc/e2e/analyzer/video/multi_reader_queue.h" + #include "absl/types/optional.h" #include "test/gtest.h" namespace webrtc { namespace { -TEST(MultiHeadQueueTest, GetOnEmpty) { - MultiHeadQueue queue = MultiHeadQueue(10); - EXPECT_TRUE(queue.IsEmpty(0)); +TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) { + MultiReaderQueue queue = MultiReaderQueue(10); + EXPECT_EQ(queue.size(), 0lu); for (int i = 0; i < 10; ++i) { + EXPECT_TRUE(queue.IsEmpty(i)); + EXPECT_EQ(queue.size(i), 0lu); EXPECT_FALSE(queue.PopFront(i).has_value()); EXPECT_FALSE(queue.Front(i).has_value()); } } -TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) { - MultiHeadQueue queue = MultiHeadQueue(1); +TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) { + MultiReaderQueue queue = MultiReaderQueue(10); + queue.PushBack(1); + queue.PushBack(2); + queue.PushBack(3); + EXPECT_EQ(queue.size(), 3lu); + for (int i = 0; i < 10; ++i) { + EXPECT_FALSE(queue.IsEmpty(i)); + EXPECT_EQ(queue.size(i), 3lu); + } +} + +TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) { + MultiReaderQueue queue = MultiReaderQueue(10); + for (int i = 0; i < 5; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(), 5lu); + // Removing elements from queue #0 + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(queue.size(0), static_cast(5 - i)); + EXPECT_EQ(queue.PopFront(0), absl::optional(i)); + for (int j = 1; j < 10; ++j) { + EXPECT_EQ(queue.size(j), 5lu); + } + } + EXPECT_EQ(queue.size(0), 0lu); + EXPECT_TRUE(queue.IsEmpty(0)); +} + +TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) { + MultiReaderQueue queue = MultiReaderQueue(1); queue.PushBack(1); EXPECT_EQ(queue.size(), 1lu); EXPECT_TRUE(queue.Front(0).has_value()); @@ -37,22 +70,21 @@ TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) { EXPECT_TRUE(queue.IsEmpty(0)); } -TEST(MultiHeadQueueTest, SingleHead) { - MultiHeadQueue queue = MultiHeadQueue(1); +TEST(MultiReaderQueueTest, SingleHead) { + MultiReaderQueue queue = MultiReaderQueue(1); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); } for (size_t i = 0; i < 10; ++i) { - absl::optional value = queue.PopFront(0); + EXPECT_EQ(queue.Front(0), absl::optional(i)); + EXPECT_EQ(queue.PopFront(0), absl::optional(i)); EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value.has_value()); - EXPECT_EQ(value.value(), i); } } -TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { - MultiHeadQueue queue = MultiHeadQueue(3); +TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { + MultiReaderQueue queue = MultiReaderQueue(3); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); @@ -77,8 +109,8 @@ TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { } } -TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) { - MultiHeadQueue queue = MultiHeadQueue(3); +TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) { + MultiReaderQueue queue = MultiReaderQueue(3); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); @@ -97,14 +129,14 @@ TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) { } } -TEST(MultiHeadQueueTest, HeadCopy) { - MultiHeadQueue queue = MultiHeadQueue(1); +TEST(MultiReaderQueueTest, AddReader) { + MultiReaderQueue queue = MultiReaderQueue(1); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); } - queue.AddHead(0); - EXPECT_EQ(queue.readers_count(), 2u); + queue.AddReader(0); + EXPECT_EQ(queue.readers_count(), 2lu); for (size_t i = 0; i < 10; ++i) { absl::optional value1 = queue.PopFront(0); absl::optional value2 = queue.PopFront(1); @@ -116,5 +148,31 @@ TEST(MultiHeadQueueTest, HeadCopy) { } } +TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) { + MultiReaderQueue queue = MultiReaderQueue(2); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(1), 10lu); + + queue.RemoveReader(0); + + EXPECT_EQ(queue.readers_count(), 1lu); + EXPECT_EQ(queue.size(1), 10lu); +} + +TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) { + MultiReaderQueue queue = MultiReaderQueue(1); + for (size_t i = 0; i < 10; ++i) { + queue.PushBack(i); + } + EXPECT_EQ(queue.size(), 10lu); + + queue.RemoveReader(0); + + EXPECT_EQ(queue.size(), 0lu); + EXPECT_EQ(queue.readers_count(), 0lu); +} + } // namespace } // namespace webrtc