[DVQA] Rewrite MultiHeadQueue and add ability to remove head

Rewrite MultiHeadQueue reducing space complexity from
O(readers count * queue size) to O(queue size + readers count).

Bug: b/231397778
Change-Id: Ifbd9c686915368773916ed86467f4de3f8e06af1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265621
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37197}
This commit is contained in:
Artem Titov
2022-06-13 15:05:21 +02:00
committed by WebRTC LUCI CQ
parent 1709341fd9
commit 69a32cc2d3
6 changed files with 250 additions and 133 deletions

View File

@ -36,7 +36,7 @@ if (!build_with_chromium) {
deps = [
":default_video_quality_analyzer_frames_comparator_test",
":default_video_quality_analyzer_test",
":multi_head_queue_test",
":multi_reader_queue_test",
":names_collection_test",
":peer_connection_e2e_smoke_test",
":single_process_encoded_image_data_injector_unittest",
@ -585,11 +585,11 @@ if (!build_with_chromium) {
]
}
rtc_library("multi_head_queue_test") {
rtc_library("multi_reader_queue_test") {
testonly = true
sources = [ "analyzer/video/multi_head_queue_test.cc" ]
sources = [ "analyzer/video/multi_reader_queue_test.cc" ]
deps = [
":multi_head_queue",
":multi_reader_queue",
"../../../test:test_support",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
@ -692,7 +692,6 @@ if (!build_with_chromium) {
deps = [
":default_video_quality_analyzer_internal",
":default_video_quality_analyzer_shared",
":multi_head_queue",
"../..:perf_test",
"../../../api:array_view",
"../../../api:video_quality_analyzer_api",
@ -748,7 +747,7 @@ if (!build_with_chromium) {
deps = [
":default_video_quality_analyzer_shared",
":multi_head_queue",
":multi_reader_queue",
"../../../api:array_view",
"../../../api:scoped_refptr",
"../../../api/numerics:numerics",
@ -895,10 +894,10 @@ if (!build_with_chromium) {
]
}
rtc_library("multi_head_queue") {
rtc_library("multi_reader_queue") {
visibility = [ "*" ]
testonly = true
sources = [ "analyzer/video/multi_head_queue.h" ]
sources = [ "analyzer/video/multi_reader_queue.h" ]
deps = [ "../../../rtc_base:checks" ]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}

View File

@ -39,7 +39,6 @@
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_internal_shared_objects.h"
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_shared_objects.h"
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
#include "test/pc/e2e/analyzer/video/names_collection.h"
#include "test/testsupport/perf_test.h"

View File

@ -15,7 +15,7 @@
#include "absl/types/optional.h"
#include "api/units/timestamp.h"
#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
namespace webrtc {
@ -50,7 +50,7 @@ class StreamState {
// When new peer is added - all current alive frames will be sent to it as
// well. So we need to register them as expected by copying owner_ head to
// the new head.
void AddPeer() { frame_ids_.AddHead(GetAliveFramesQueueIndex()); }
void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); }
size_t GetAliveFramesCount() const {
return frame_ids_.size(GetAliveFramesQueueIndex());
@ -86,7 +86,7 @@ class StreamState {
// If we received frame with id frame_id3, then we will pop frame_id1 and
// frame_id2 and consider those frames as dropped and then compare received
// frame with the one from `FrameInFlight` with id frame_id3.
MultiHeadQueue<uint16_t> frame_ids_;
MultiReaderQueue<uint16_t> frame_ids_;
std::map<size_t, Timestamp> last_rendered_frame_time_;
};

View File

@ -1,102 +0,0 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_
#include <deque>
#include <memory>
#include <vector>
#include "absl/types/optional.h"
#include "rtc_base/checks.h"
namespace webrtc {
// A queue that allows more than one reader. Readers are independent, and all
// readers will see all elements; an inserted element stays in the queue until
// all readers have extracted it. Elements are copied and copying is assumed to
// be cheap.
template <typename T>
class MultiHeadQueue {
public:
// Creates queue with exactly `readers_count` readers.
explicit MultiHeadQueue(size_t readers_count) {
for (size_t i = 0; i < readers_count; ++i) {
queues_.push_back(std::deque<T>());
}
}
// Creates a copy of an existing head. Complexity O(MultiHeadQueue::size()).
// `copy_index` - index of the queue that will be used as a source for
// copying.
void AddHead(size_t copy_index) { queues_.push_back(queues_[copy_index]); }
// Add value to the end of the queue. Complexity O(readers_count).
void PushBack(T value) {
for (auto& queue : queues_) {
queue.push_back(value);
}
}
// Extract element from specified head. Complexity O(1).
absl::optional<T> PopFront(size_t index) {
RTC_CHECK_LT(index, queues_.size());
if (queues_[index].empty()) {
return absl::nullopt;
}
T out = queues_[index].front();
queues_[index].pop_front();
return out;
}
// Returns element at specified head. Complexity O(1).
absl::optional<T> Front(size_t index) const {
RTC_CHECK_LT(index, queues_.size());
if (queues_[index].empty()) {
return absl::nullopt;
}
return queues_[index].front();
}
// Returns true if for specified head there are no more elements in the queue
// or false otherwise. Complexity O(1).
bool IsEmpty(size_t index) const {
RTC_CHECK_LT(index, queues_.size());
return queues_[index].empty();
}
// Returns size of the longest queue between all readers.
// Complexity O(readers_count).
size_t size() const {
size_t size = 0;
for (auto& queue : queues_) {
if (queue.size() > size) {
size = queue.size();
}
}
return size;
}
// Returns size of the specified queue. Complexity O(1).
size_t size(size_t index) const {
RTC_CHECK_LT(index, queues_.size());
return queues_[index].size();
}
size_t readers_count() const { return queues_.size(); }
private:
std::vector<std::deque<T>> queues_;
};
} // namespace webrtc
#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_HEAD_QUEUE_H_

View File

@ -0,0 +1,163 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
#define TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_
#include <deque>
#include <memory>
#include <set>
#include <unordered_map>
#include "absl/types/optional.h"
#include "rtc_base/checks.h"
namespace webrtc {
// Represents the queue which can be read by multiple readers. Each reader reads
// from its own queue head. When an element is added it will become visible for
// all readers. When an element will be removed by all the readers, the element
// will be removed from the queue.
template <typename T>
class MultiReaderQueue {
public:
// Creates queue with exactly `readers_count` readers named from 0 to
// `readers_count - 1`.
explicit MultiReaderQueue(size_t readers_count) {
for (size_t i = 0; i < readers_count; ++i) {
heads_[i] = 0;
}
}
// Creates queue with specified readers.
explicit MultiReaderQueue(std::set<size_t> readers) {
for (size_t reader : readers) {
heads_[reader] = 0;
}
}
// Adds a new reader, initializing its reading position (the reader's head)
// equal to the one of `reader_to_copy`. New reader will have name index
// equal to the current readers count.
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
void AddReader(size_t reader_to_copy) {
AddReader(heads_.size(), reader_to_copy);
}
// Adds a new `reader`, initializing its reading position (the reader's head)
// equal to the one of `reader_to_copy`.
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
void AddReader(size_t reader, size_t reader_to_copy) {
size_t pos = GetHeadPositionOrDie(reader_to_copy);
auto it = heads_.find(reader);
RTC_CHECK(it == heads_.end())
<< "Reader " << reader << " is already in the queue";
heads_[reader] = heads_[reader_to_copy];
for (size_t i = pos; i < queue_.size(); ++i) {
in_queues_[i]++;
}
}
// Removes specified `reader` from the queue.
// Complexity O(MultiReaderQueue::size(reader)).
void RemoveReader(size_t reader) {
size_t pos = GetHeadPositionOrDie(reader);
for (size_t i = pos; i < queue_.size(); ++i) {
in_queues_[i]--;
}
while (!in_queues_.empty() && in_queues_[0] == 0) {
PopFront();
}
heads_.erase(reader);
}
// Add value to the end of the queue. Complexity O(1).
void PushBack(T value) {
queue_.push_back(value);
in_queues_.push_back(heads_.size());
}
// Extract element from specified head. Complexity O(1).
absl::optional<T> PopFront(size_t reader) {
size_t pos = GetHeadPositionOrDie(reader);
if (pos >= queue_.size()) {
return absl::nullopt;
}
T out = queue_[pos];
in_queues_[pos]--;
heads_[reader]++;
if (in_queues_[pos] == 0) {
RTC_DCHECK_EQ(pos, 0);
PopFront();
}
return out;
}
// Returns element at specified head. Complexity O(1).
absl::optional<T> Front(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
if (pos >= queue_.size()) {
return absl::nullopt;
}
return queue_[pos];
}
// Returns true if for specified head there are no more elements in the queue
// or false otherwise. Complexity O(1).
bool IsEmpty(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
return pos >= queue_.size();
}
// Returns size of the longest queue between all readers.
// Complexity O(1).
size_t size() const { return queue_.size(); }
// Returns size of the specified queue. Complexity O(1).
size_t size(size_t reader) const {
size_t pos = GetHeadPositionOrDie(reader);
return queue_.size() - pos;
}
// Complexity O(1).
size_t readers_count() const { return heads_.size(); }
private:
size_t GetHeadPositionOrDie(size_t reader) const {
auto it = heads_.find(reader);
RTC_CHECK(it != heads_.end()) << "No queue for reader " << reader;
return it->second - removed_elements_count_;
}
void PopFront() {
RTC_DCHECK(!queue_.empty());
RTC_DCHECK_EQ(in_queues_[0], 0);
queue_.pop_front();
in_queues_.pop_front();
removed_elements_count_++;
}
// Number of the elements that were removed from the queue. It is used to
// subtract from each head to compute the right index inside `queue_`;
size_t removed_elements_count_ = 0;
std::deque<T> queue_;
// In how may queues the element at index `i` is. An element can be removed
// from the front if and only if it is in 0 queues.
std::deque<size_t> in_queues_;
// Map from the reader to the head position in the queue.
std::unordered_map<size_t, size_t> heads_;
};
} // namespace webrtc
#endif // TEST_PC_E2E_ANALYZER_VIDEO_MULTI_READER_QUEUE_H_

View File

@ -8,24 +8,57 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/pc/e2e/analyzer/video/multi_head_queue.h"
#include "test/pc/e2e/analyzer/video/multi_reader_queue.h"
#include "absl/types/optional.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
TEST(MultiHeadQueueTest, GetOnEmpty) {
MultiHeadQueue<int> queue = MultiHeadQueue<int>(10);
EXPECT_TRUE(queue.IsEmpty(0));
TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
EXPECT_EQ(queue.size(), 0lu);
for (int i = 0; i < 10; ++i) {
EXPECT_TRUE(queue.IsEmpty(i));
EXPECT_EQ(queue.size(i), 0lu);
EXPECT_FALSE(queue.PopFront(i).has_value());
EXPECT_FALSE(queue.Front(i).has_value());
}
}
TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) {
MultiHeadQueue<int> queue = MultiHeadQueue<int>(1);
TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
queue.PushBack(1);
queue.PushBack(2);
queue.PushBack(3);
EXPECT_EQ(queue.size(), 3lu);
for (int i = 0; i < 10; ++i) {
EXPECT_FALSE(queue.IsEmpty(i));
EXPECT_EQ(queue.size(i), 3lu);
}
}
TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
for (int i = 0; i < 5; ++i) {
queue.PushBack(i);
}
EXPECT_EQ(queue.size(), 5lu);
// Removing elements from queue #0
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i));
EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i));
for (int j = 1; j < 10; ++j) {
EXPECT_EQ(queue.size(j), 5lu);
}
}
EXPECT_EQ(queue.size(0), 0lu);
EXPECT_TRUE(queue.IsEmpty(0));
}
TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
MultiReaderQueue<int> queue = MultiReaderQueue<int>(1);
queue.PushBack(1);
EXPECT_EQ(queue.size(), 1lu);
EXPECT_TRUE(queue.Front(0).has_value());
@ -37,22 +70,21 @@ TEST(MultiHeadQueueTest, SingleHeadOneAddOneRemove) {
EXPECT_TRUE(queue.IsEmpty(0));
}
TEST(MultiHeadQueueTest, SingleHead) {
MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
TEST(MultiReaderQueueTest, SingleHead) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
EXPECT_EQ(queue.size(), i + 1);
}
for (size_t i = 0; i < 10; ++i) {
absl::optional<size_t> value = queue.PopFront(0);
EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i));
EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i));
EXPECT_EQ(queue.size(), 10 - i - 1);
ASSERT_TRUE(value.has_value());
EXPECT_EQ(value.value(), i);
}
}
TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
EXPECT_EQ(queue.size(), i + 1);
@ -77,8 +109,8 @@ TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
}
}
TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) {
MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(3);
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
EXPECT_EQ(queue.size(), i + 1);
@ -97,14 +129,14 @@ TEST(MultiHeadQueueTest, ThreeHeadsAddAllRemoveAll) {
}
}
TEST(MultiHeadQueueTest, HeadCopy) {
MultiHeadQueue<size_t> queue = MultiHeadQueue<size_t>(1);
TEST(MultiReaderQueueTest, AddReader) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
EXPECT_EQ(queue.size(), i + 1);
}
queue.AddHead(0);
EXPECT_EQ(queue.readers_count(), 2u);
queue.AddReader(0);
EXPECT_EQ(queue.readers_count(), 2lu);
for (size_t i = 0; i < 10; ++i) {
absl::optional<size_t> value1 = queue.PopFront(0);
absl::optional<size_t> value2 = queue.PopFront(1);
@ -116,5 +148,31 @@ TEST(MultiHeadQueueTest, HeadCopy) {
}
}
TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
}
EXPECT_EQ(queue.size(1), 10lu);
queue.RemoveReader(0);
EXPECT_EQ(queue.readers_count(), 1lu);
EXPECT_EQ(queue.size(1), 10lu);
}
TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
for (size_t i = 0; i < 10; ++i) {
queue.PushBack(i);
}
EXPECT_EQ(queue.size(), 10lu);
queue.RemoveReader(0);
EXPECT_EQ(queue.size(), 0lu);
EXPECT_EQ(queue.readers_count(), 0lu);
}
} // namespace
} // namespace webrtc