From 9c55fa5a0e530d6c29d29e608d2366525b4459d7 Mon Sep 17 00:00:00 2001 From: Artem Titov Date: Wed, 15 Jun 2022 12:24:24 +0200 Subject: [PATCH] [DVQA] Add support for removing peer from the StreamState Bug: b/231397778 Change-Id: I8ce1486f91f6c84e246e043f2a4e2dd94fc29d06 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265809 Reviewed-by: Mirko Bonadei Commit-Queue: Artem Titov Cr-Commit-Position: refs/heads/main@{#37220} --- test/pc/e2e/BUILD.gn | 15 +++ .../video/default_video_quality_analyzer.cc | 21 ++- ...ult_video_quality_analyzer_stream_state.cc | 83 +++++++----- ...ault_video_quality_analyzer_stream_state.h | 69 +++++----- ...ideo_quality_analyzer_stream_state_test.cc | 126 ++++++++++++++++++ .../e2e/analyzer/video/multi_reader_queue.h | 21 +-- .../analyzer/video/multi_reader_queue_test.cc | 120 ++++++++++------- 7 files changed, 325 insertions(+), 130 deletions(-) create mode 100644 test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc diff --git a/test/pc/e2e/BUILD.gn b/test/pc/e2e/BUILD.gn index 12db7fbe11..2e4410d361 100644 --- a/test/pc/e2e/BUILD.gn +++ b/test/pc/e2e/BUILD.gn @@ -35,6 +35,7 @@ if (!build_with_chromium) { deps = [ ":default_video_quality_analyzer_frames_comparator_test", + ":default_video_quality_analyzer_stream_state_test", ":default_video_quality_analyzer_test", ":multi_reader_queue_test", ":names_collection_test", @@ -594,6 +595,19 @@ if (!build_with_chromium) { ] absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] } + + rtc_library("default_video_quality_analyzer_stream_state_test") { + testonly = true + sources = [ + "analyzer/video/default_video_quality_analyzer_stream_state_test.cc", + ] + deps = [ + ":default_video_quality_analyzer_internal", + "../../../api/units:timestamp", + "../../../test:test_support", + ] + absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ] + } } rtc_library("analyzer_helper") { @@ -726,6 +740,7 @@ if (!build_with_chromium) { visibility = [ ":default_video_quality_analyzer", ":default_video_quality_analyzer_frames_comparator_test", + ":default_video_quality_analyzer_stream_state_test", ":names_collection_test", ] diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc index 5922c44e27..4426db4317 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer.cc @@ -193,12 +193,16 @@ uint16_t DefaultVideoQualityAnalyzer::OnFrameCaptured( } } + std::set frame_receivers_indexes = peers_->GetPresentIndexes(); + if (!options_.enable_receive_own_stream) { + frame_receivers_indexes.erase(peer_index); + } + auto state_it = stream_states_.find(stream_index); if (state_it == stream_states_.end()) { stream_states_.emplace( stream_index, - StreamState(peer_index, peers_->size(), - options_.enable_receive_own_stream, captured_time)); + StreamState(peer_index, frame_receivers_indexes, captured_time)); } StreamState* state = &stream_states_.at(stream_index); state->PushBack(frame_id); @@ -230,10 +234,6 @@ uint16_t DefaultVideoQualityAnalyzer::OnFrameCaptured( captured_frames_in_flight_.erase(it); } - std::set frame_receivers_indexes = peers_->GetPresentIndexes(); - if (!options_.enable_receive_own_stream) { - frame_receivers_indexes.erase(peer_index); - } captured_frames_in_flight_.emplace( frame_id, FrameInFlight(stream_index, frame, captured_time, std::move(frame_receivers_indexes))); @@ -470,7 +470,7 @@ void DefaultVideoQualityAnalyzer::OnFrameRendered( const size_t stream_index = frame_in_flight->stream(); StreamState* state = &stream_states_.at(stream_index); - const InternalStatsKey stats_key(stream_index, state->owner(), peer_index); + const InternalStatsKey stats_key(stream_index, state->sender(), peer_index); // Update frames counters. frame_counters_.rendered++; @@ -498,7 +498,6 @@ void DefaultVideoQualityAnalyzer::OnFrameRendered( auto dropped_frame_it = captured_frames_in_flight_.find(dropped_frame_id); RTC_DCHECK(dropped_frame_it != captured_frames_in_flight_.end()); - absl::optional dropped_frame = dropped_frame_it->second.frame(); dropped_frame_it->second.MarkDropped(peer_index); analyzer_stats_.frames_in_flight_left_count.AddSample( @@ -591,7 +590,7 @@ void DefaultVideoQualityAnalyzer::RegisterParticipantInCall( // Ensure, that frames states are handled correctly // (e.g. dropped frames tracking). for (auto& key_val : stream_states_) { - key_val.second.AddPeer(); + key_val.second.AddPeer(new_peer_index); } // Register new peer for every frame in flight. // It is guaranteed, that no garbage FrameInFlight objects will stay in @@ -624,11 +623,11 @@ void DefaultVideoQualityAnalyzer::Stop() { const size_t stream_index = state_entry.first; StreamState& stream_state = state_entry.second; for (size_t i = 0; i < peers_->size(); ++i) { - if (i == stream_state.owner() && !options_.enable_receive_own_stream) { + if (i == stream_state.sender() && !options_.enable_receive_own_stream) { continue; } - InternalStatsKey stats_key(stream_index, stream_state.owner(), i); + InternalStatsKey stats_key(stream_index, stream_state.sender(), i); // If there are no freezes in the call we have to report // time_between_freezes_ms as call duration and in such case diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc index f1d98b8350..d59ef12c63 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.cc @@ -11,6 +11,7 @@ #include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h" #include +#include #include "absl/types/optional.h" #include "api/units/timestamp.h" @@ -30,26 +31,33 @@ absl::optional MaybeGetValue(const std::map& map, size_t key) { } // namespace +StreamState::StreamState(size_t sender, + std::set receivers, + Timestamp stream_started_time) + : sender_(sender), + stream_started_time_(stream_started_time), + receivers_(receivers), + frame_ids_(std::move(receivers)) { + frame_ids_.AddReader(kAliveFramesQueueIndex); + RTC_CHECK_NE(sender_, kAliveFramesQueueIndex); + for (size_t receiver : receivers_) { + RTC_CHECK_NE(receiver, kAliveFramesQueueIndex); + } +} + uint16_t StreamState::PopFront(size_t peer) { - size_t peer_queue = GetPeerQueueIndex(peer); - size_t alive_frames_queue = GetAliveFramesQueueIndex(); - absl::optional frame_id = frame_ids_.PopFront(peer_queue); + RTC_CHECK_NE(peer, kAliveFramesQueueIndex); + absl::optional frame_id = frame_ids_.PopFront(peer); RTC_DCHECK(frame_id.has_value()); // If alive's frame queue is longer than all others, than also pop frame from // it, because that frame is received by all receivers. - size_t alive_size = frame_ids_.size(alive_frames_queue); - size_t other_size = 0; - for (size_t i = 0; i < frame_ids_.readers_count(); ++i) { - size_t cur_size = frame_ids_.size(i); - if (i != alive_frames_queue && cur_size > other_size) { - other_size = cur_size; - } - } + size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex); + size_t other_size = GetLongestReceiverQueue(); // Pops frame from alive queue if alive's queue is the longest one. if (alive_size > other_size) { absl::optional alive_frame_id = - frame_ids_.PopFront(alive_frames_queue); + frame_ids_.PopFront(kAliveFramesQueueIndex); RTC_DCHECK(alive_frame_id.has_value()); RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value()); } @@ -57,9 +65,30 @@ uint16_t StreamState::PopFront(size_t peer) { return frame_id.value(); } +void StreamState::AddPeer(size_t peer) { + RTC_CHECK_NE(peer, kAliveFramesQueueIndex); + frame_ids_.AddReader(peer, kAliveFramesQueueIndex); + receivers_.insert(peer); +} + +void StreamState::RemovePeer(size_t peer) { + RTC_CHECK_NE(peer, kAliveFramesQueueIndex); + frame_ids_.RemoveReader(peer); + receivers_.erase(peer); + + // If we removed the last receiver for the alive frames, we need to pop them + // from the queue, because now they received by all receivers. + size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex); + size_t other_size = GetLongestReceiverQueue(); + while (alive_size > other_size) { + frame_ids_.PopFront(kAliveFramesQueueIndex); + alive_size--; + } +} + uint16_t StreamState::MarkNextAliveFrameAsDead() { absl::optional frame_id = - frame_ids_.PopFront(GetAliveFramesQueueIndex()); + frame_ids_.PopFront(kAliveFramesQueueIndex); RTC_DCHECK(frame_id.has_value()); return frame_id.value(); } @@ -78,27 +107,15 @@ absl::optional StreamState::last_rendered_frame_time( return MaybeGetValue(last_rendered_frame_time_, peer); } -size_t StreamState::GetPeerQueueIndex(size_t peer_index) const { - // When sender isn't expecting to receive its own stream we will use their - // queue for tracking alive frames. Otherwise we will use the queue #0 to - // track alive frames and will shift all other queues for peers on 1. - // It means when `enable_receive_own_stream_` is true peer's queue will have - // index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is - // false peer's queue will have index equal to `peer_index`. - if (!enable_receive_own_stream_) { - return peer_index; +size_t StreamState::GetLongestReceiverQueue() const { + size_t max = 0; + for (size_t receiver : receivers_) { + size_t cur_size = frame_ids_.size(receiver); + if (cur_size > max) { + max = cur_size; + } } - return peer_index + 1; -} - -size_t StreamState::GetAliveFramesQueueIndex() const { - // When sender isn't expecting to receive its own stream we will use their - // queue for tracking alive frames. Otherwise we will use the queue #0 to - // track alive frames and will shift all other queues for peers on 1. - if (!enable_receive_own_stream_) { - return owner_; - } - return 0; + return max; } } // namespace webrtc diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h index 8cf41a3eaa..829a79c7bf 100644 --- a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h @@ -11,7 +11,9 @@ #ifndef TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_ #define TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_ +#include #include +#include #include "absl/types/optional.h" #include "api/units/timestamp.h" @@ -21,39 +23,44 @@ namespace webrtc { // Represents a current state of video stream inside // DefaultVideoQualityAnalyzer. +// +// Maintains the sequence of frames for each video stream and keeps track about +// which frames were seen by each of the possible stream receiver. +// +// Keeps information about which frames are alive and which are dead. Frame is +// alive if it contains VideoFrame payload for corresponding FrameInFlight +// object inside DefaultVideoQualityAnalyzer, otherwise frame is considered +// dead. +// +// Supports peer indexes from 0 to max(size_t) - 1. class StreamState { public: - StreamState(size_t owner, - size_t peers_count, - bool enable_receive_own_stream, - Timestamp stream_started_time) - : owner_(owner), - enable_receive_own_stream_(enable_receive_own_stream), - stream_started_time_(stream_started_time), - frame_ids_(enable_receive_own_stream ? peers_count + 1 : peers_count) {} + StreamState(size_t sender, + std::set receivers, + Timestamp stream_started_time); - size_t owner() const { return owner_; } + size_t sender() const { return sender_; } Timestamp stream_started_time() const { return stream_started_time_; } void PushBack(uint16_t frame_id) { frame_ids_.PushBack(frame_id); } - // Crash if state is empty. Guarantees that there can be no alive frames - // that are not in the owner queue - uint16_t PopFront(size_t peer); - bool IsEmpty(size_t peer) const { - return frame_ids_.IsEmpty(GetPeerQueueIndex(peer)); - } // Crash if state is empty. - uint16_t Front(size_t peer) const { - return frame_ids_.Front(GetPeerQueueIndex(peer)).value(); - } + uint16_t PopFront(size_t peer); + bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); } + // Crash if state is empty. + uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); } - // When new peer is added - all current alive frames will be sent to it as - // well. So we need to register them as expected by copying owner_ head to - // the new head. - void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); } + // Adds a new peer to the state. All currently alive frames will be expected + // to be received by the newly added peer. + void AddPeer(size_t peer); + + // Removes peer from the state. Frames that were expected to be received by + // this peer will be removed from it. On the other hand last rendered frame + // time for the removed peer will be preserved, because + // DefaultVideoQualityAnalyzer still may request it for stats processing. + void RemovePeer(size_t peer); size_t GetAliveFramesCount() const { - return frame_ids_.size(GetAliveFramesQueueIndex()); + return frame_ids_.size(kAliveFramesQueueIndex); } uint16_t MarkNextAliveFrameAsDead(); @@ -61,19 +68,17 @@ class StreamState { absl::optional last_rendered_frame_time(size_t peer) const; private: - // Returns index of the `frame_ids_` queue which is used for specified - // `peer_index`. - size_t GetPeerQueueIndex(size_t peer_index) const; + // Index of the `frame_ids_` queue which is used to track alive frames for + // this stream. + static constexpr size_t kAliveFramesQueueIndex = + std::numeric_limits::max(); - // Returns index of the `frame_ids_` queue which is used to track alive - // frames for this stream. The frame is alive if it contains VideoFrame - // payload in `captured_frames_in_flight_`. - size_t GetAliveFramesQueueIndex() const; + size_t GetLongestReceiverQueue() const; // Index of the owner. Owner's queue in `frame_ids_` will keep alive frames. - const size_t owner_; - const bool enable_receive_own_stream_; + const size_t sender_; const Timestamp stream_started_time_; + std::set receivers_; // To correctly determine dropped frames we have to know sequence of frames // in each stream so we will keep a list of frame ids inside the stream. // This list is represented by multi head queue of frame ids with separate diff --git a/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc new file mode 100644 index 0000000000..01a6aab28a --- /dev/null +++ b/test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state_test.cc @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2022 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h" + +#include + +#include "api/units/timestamp.h" +#include "test/gtest.h" + +namespace webrtc { +namespace { + +TEST(StreamStateTest, PopFrontAndFrontIndependentForEachPeer) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + EXPECT_EQ(state.Front(/*peer=*/1), 1); + EXPECT_EQ(state.PopFront(/*peer=*/1), 1); + EXPECT_EQ(state.Front(/*peer=*/1), 2); + EXPECT_EQ(state.PopFront(/*peer=*/1), 2); + EXPECT_EQ(state.Front(/*peer=*/2), 1); + EXPECT_EQ(state.PopFront(/*peer=*/2), 1); + EXPECT_EQ(state.Front(/*peer=*/2), 2); + EXPECT_EQ(state.PopFront(/*peer=*/2), 2); +} + +TEST(StreamStateTest, IsEmpty) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + + EXPECT_FALSE(state.IsEmpty(/*peer=*/1)); + + state.PopFront(/*peer=*/1); + + EXPECT_TRUE(state.IsEmpty(/*peer=*/1)); +} + +TEST(StreamStateTest, PopFrontForOnlyOnePeerDontChangeAliveFramesCount) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + EXPECT_EQ(state.GetAliveFramesCount(), 2lu); + + state.PopFront(/*peer=*/1); + state.PopFront(/*peer=*/1); + + EXPECT_EQ(state.GetAliveFramesCount(), 2lu); +} + +TEST(StreamStateTest, PopFrontForAllPeersReducesAliveFramesCount) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + EXPECT_EQ(state.GetAliveFramesCount(), 2lu); + + state.PopFront(/*peer=*/1); + state.PopFront(/*peer=*/2); + + EXPECT_EQ(state.GetAliveFramesCount(), 1lu); +} + +TEST(StreamStateTest, RemovePeerForLastExpectedReceiverUpdatesAliveFrames) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + state.PopFront(/*peer=*/1); + + EXPECT_EQ(state.GetAliveFramesCount(), 2lu); + + state.RemovePeer(/*peer=*/2); + + EXPECT_EQ(state.GetAliveFramesCount(), 1lu); +} + +TEST(StreamStateTest, MarkNextAliveFrameAsDeadDecreseAliveFramesCount) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + EXPECT_EQ(state.GetAliveFramesCount(), 2lu); + + state.MarkNextAliveFrameAsDead(); + + EXPECT_EQ(state.GetAliveFramesCount(), 1lu); +} + +TEST(StreamStateTest, MarkNextAliveFrameAsDeadDoesntAffectFrontFrameForPeer) { + StreamState state(/*sender=*/0, + /*receivers=*/std::set{1, 2}, + Timestamp::Seconds(1)); + state.PushBack(/*frame_id=*/1); + state.PushBack(/*frame_id=*/2); + + EXPECT_EQ(state.Front(/*peer=*/1), 1); + + state.MarkNextAliveFrameAsDead(); + + EXPECT_EQ(state.Front(/*peer=*/1), 1); +} + +} // namespace +} // namespace webrtc diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue.h b/test/pc/e2e/analyzer/video/multi_reader_queue.h index c8a7db0cec..39d26b42bc 100644 --- a/test/pc/e2e/analyzer/video/multi_reader_queue.h +++ b/test/pc/e2e/analyzer/video/multi_reader_queue.h @@ -42,14 +42,6 @@ class MultiReaderQueue { } } - // Adds a new reader, initializing its reading position (the reader's head) - // equal to the one of `reader_to_copy`. New reader will have name index - // equal to the current readers count. - // Complexity O(MultiReaderQueue::size(reader_to_copy)). - void AddReader(size_t reader_to_copy) { - AddReader(heads_.size(), reader_to_copy); - } - // Adds a new `reader`, initializing its reading position (the reader's head) // equal to the one of `reader_to_copy`. // Complexity O(MultiReaderQueue::size(reader_to_copy)). @@ -65,6 +57,19 @@ class MultiReaderQueue { } } + // Adds a new `reader`, initializing its reading position equal to first + // element in the queue. + // Complexity O(MultiReaderQueue::size()). + void AddReader(size_t reader) { + auto it = heads_.find(reader); + RTC_CHECK(it == heads_.end()) + << "Reader " << reader << " is already in the queue"; + heads_[reader] = removed_elements_count_; + for (size_t i = 0; i < queue_.size(); ++i) { + in_queues_[i]++; + } + } + // Removes specified `reader` from the queue. // Complexity O(MultiReaderQueue::size(reader)). void RemoveReader(size_t reader) { diff --git a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc index e41c4a8208..ea6aa0a416 100644 --- a/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc +++ b/test/pc/e2e/analyzer/video/multi_reader_queue_test.cc @@ -17,92 +17,94 @@ namespace webrtc { namespace { TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) { - MultiReaderQueue queue = MultiReaderQueue(10); + MultiReaderQueue queue = MultiReaderQueue(/*readers_count=*/10); EXPECT_EQ(queue.size(), 0lu); for (int i = 0; i < 10; ++i) { - EXPECT_TRUE(queue.IsEmpty(i)); - EXPECT_EQ(queue.size(i), 0lu); - EXPECT_FALSE(queue.PopFront(i).has_value()); - EXPECT_FALSE(queue.Front(i).has_value()); + EXPECT_TRUE(queue.IsEmpty(/*reader=*/i)); + EXPECT_EQ(queue.size(/*reader=*/i), 0lu); + EXPECT_FALSE(queue.PopFront(/*reader=*/i).has_value()); + EXPECT_FALSE(queue.Front(/*reader=*/i).has_value()); } } TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) { - MultiReaderQueue queue = MultiReaderQueue(10); + MultiReaderQueue queue = MultiReaderQueue(/*readers_count=*/10); queue.PushBack(1); queue.PushBack(2); queue.PushBack(3); EXPECT_EQ(queue.size(), 3lu); for (int i = 0; i < 10; ++i) { - EXPECT_FALSE(queue.IsEmpty(i)); - EXPECT_EQ(queue.size(i), 3lu); + EXPECT_FALSE(queue.IsEmpty(/*reader=*/i)); + EXPECT_EQ(queue.size(/*reader=*/i), 3lu); } } TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) { - MultiReaderQueue queue = MultiReaderQueue(10); + MultiReaderQueue queue = MultiReaderQueue(/*readers_count=*/10); for (int i = 0; i < 5; ++i) { queue.PushBack(i); } EXPECT_EQ(queue.size(), 5lu); // Removing elements from queue #0 for (int i = 0; i < 5; ++i) { - EXPECT_EQ(queue.size(0), static_cast(5 - i)); - EXPECT_EQ(queue.PopFront(0), absl::optional(i)); + EXPECT_EQ(queue.size(/*reader=*/0), static_cast(5 - i)); + EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional(i)); for (int j = 1; j < 10; ++j) { - EXPECT_EQ(queue.size(j), 5lu); + EXPECT_EQ(queue.size(/*reader=*/j), 5lu); } } - EXPECT_EQ(queue.size(0), 0lu); - EXPECT_TRUE(queue.IsEmpty(0)); + EXPECT_EQ(queue.size(/*reader=*/0), 0lu); + EXPECT_TRUE(queue.IsEmpty(/*reader=*/0)); } TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) { - MultiReaderQueue queue = MultiReaderQueue(1); + MultiReaderQueue queue = MultiReaderQueue(/*readers_count=*/1); queue.PushBack(1); EXPECT_EQ(queue.size(), 1lu); - EXPECT_TRUE(queue.Front(0).has_value()); - EXPECT_EQ(queue.Front(0).value(), 1); - absl::optional value = queue.PopFront(0); + EXPECT_TRUE(queue.Front(/*reader=*/0).has_value()); + EXPECT_EQ(queue.Front(/*reader=*/0).value(), 1); + absl::optional value = queue.PopFront(/*reader=*/0); EXPECT_TRUE(value.has_value()); EXPECT_EQ(value.value(), 1); EXPECT_EQ(queue.size(), 0lu); - EXPECT_TRUE(queue.IsEmpty(0)); + EXPECT_TRUE(queue.IsEmpty(/*reader=*/0)); } TEST(MultiReaderQueueTest, SingleHead) { - MultiReaderQueue queue = MultiReaderQueue(1); + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/1); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); } for (size_t i = 0; i < 10; ++i) { - EXPECT_EQ(queue.Front(0), absl::optional(i)); - EXPECT_EQ(queue.PopFront(0), absl::optional(i)); + EXPECT_EQ(queue.Front(/*reader=*/0), absl::optional(i)); + EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional(i)); EXPECT_EQ(queue.size(), 10 - i - 1); } } TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { - MultiReaderQueue queue = MultiReaderQueue(3); + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/3); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); } for (size_t i = 0; i < 10; ++i) { - absl::optional value = queue.PopFront(0); + absl::optional value = queue.PopFront(/*reader=*/0); EXPECT_EQ(queue.size(), 10lu); ASSERT_TRUE(value.has_value()); EXPECT_EQ(value.value(), i); } for (size_t i = 0; i < 10; ++i) { - absl::optional value = queue.PopFront(1); + absl::optional value = queue.PopFront(/*reader=*/1); EXPECT_EQ(queue.size(), 10lu); ASSERT_TRUE(value.has_value()); EXPECT_EQ(value.value(), i); } for (size_t i = 0; i < 10; ++i) { - absl::optional value = queue.PopFront(2); + absl::optional value = queue.PopFront(/*reader=*/2); EXPECT_EQ(queue.size(), 10 - i - 1); ASSERT_TRUE(value.has_value()); EXPECT_EQ(value.value(), i); @@ -110,15 +112,16 @@ TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) { } TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) { - MultiReaderQueue queue = MultiReaderQueue(3); + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/3); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); EXPECT_EQ(queue.size(), i + 1); } for (size_t i = 0; i < 10; ++i) { - absl::optional value1 = queue.PopFront(0); - absl::optional value2 = queue.PopFront(1); - absl::optional value3 = queue.PopFront(2); + absl::optional value1 = queue.PopFront(/*reader=*/0); + absl::optional value2 = queue.PopFront(/*reader=*/1); + absl::optional value3 = queue.PopFront(/*reader=*/2); EXPECT_EQ(queue.size(), 10 - i - 1); ASSERT_TRUE(value1.has_value()); ASSERT_TRUE(value2.has_value()); @@ -129,40 +132,65 @@ TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) { } } -TEST(MultiReaderQueueTest, AddReader) { - MultiReaderQueue queue = MultiReaderQueue(1); +TEST(MultiReaderQueueTest, AddReaderSeeElementsOnlyFromReaderToCopy) { + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/2); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); - EXPECT_EQ(queue.size(), i + 1); } - queue.AddReader(0); - EXPECT_EQ(queue.readers_count(), 2lu); + for (size_t i = 0; i < 5; ++i) { + queue.PopFront(0); + } + + queue.AddReader(/*reader=*/2, /*reader_to_copy=*/0); + + EXPECT_EQ(queue.readers_count(), 3lu); + for (size_t i = 5; i < 10; ++i) { + absl::optional value = queue.PopFront(/*reader=*/2); + EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), i); + } +} + +TEST(MultiReaderQueueTest, AddReaderWithoutReaderToCopySeeFullQueue) { + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/2); for (size_t i = 0; i < 10; ++i) { - absl::optional value1 = queue.PopFront(0); - absl::optional value2 = queue.PopFront(1); - EXPECT_EQ(queue.size(), 10 - i - 1); - ASSERT_TRUE(value1.has_value()); - ASSERT_TRUE(value2.has_value()); - EXPECT_EQ(value1.value(), i); - EXPECT_EQ(value2.value(), i); + queue.PushBack(i); + } + for (size_t i = 0; i < 5; ++i) { + queue.PopFront(/*reader=*/0); + } + + queue.AddReader(/*reader=*/2); + + EXPECT_EQ(queue.readers_count(), 3lu); + for (size_t i = 0; i < 10; ++i) { + absl::optional value = queue.PopFront(/*reader=*/2); + EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1); + ASSERT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), i); } } TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) { - MultiReaderQueue queue = MultiReaderQueue(2); + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/2); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); } - EXPECT_EQ(queue.size(1), 10lu); + EXPECT_EQ(queue.size(/*reader=*/1), 10lu); queue.RemoveReader(0); EXPECT_EQ(queue.readers_count(), 1lu); - EXPECT_EQ(queue.size(1), 10lu); + EXPECT_EQ(queue.size(/*reader=*/1), 10lu); } TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) { - MultiReaderQueue queue = MultiReaderQueue(1); + MultiReaderQueue queue = + MultiReaderQueue(/*readers_count=*/1); for (size_t i = 0; i < 10; ++i) { queue.PushBack(i); }