[DVQA] Add support for removing peer from the StreamState
Bug: b/231397778 Change-Id: I8ce1486f91f6c84e246e043f2a4e2dd94fc29d06 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265809 Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org> Commit-Queue: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37220}
This commit is contained in:

committed by
WebRTC LUCI CQ

parent
105711e9ad
commit
9c55fa5a0e
@ -35,6 +35,7 @@ if (!build_with_chromium) {
|
|||||||
|
|
||||||
deps = [
|
deps = [
|
||||||
":default_video_quality_analyzer_frames_comparator_test",
|
":default_video_quality_analyzer_frames_comparator_test",
|
||||||
|
":default_video_quality_analyzer_stream_state_test",
|
||||||
":default_video_quality_analyzer_test",
|
":default_video_quality_analyzer_test",
|
||||||
":multi_reader_queue_test",
|
":multi_reader_queue_test",
|
||||||
":names_collection_test",
|
":names_collection_test",
|
||||||
@ -594,6 +595,19 @@ if (!build_with_chromium) {
|
|||||||
]
|
]
|
||||||
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rtc_library("default_video_quality_analyzer_stream_state_test") {
|
||||||
|
testonly = true
|
||||||
|
sources = [
|
||||||
|
"analyzer/video/default_video_quality_analyzer_stream_state_test.cc",
|
||||||
|
]
|
||||||
|
deps = [
|
||||||
|
":default_video_quality_analyzer_internal",
|
||||||
|
"../../../api/units:timestamp",
|
||||||
|
"../../../test:test_support",
|
||||||
|
]
|
||||||
|
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rtc_library("analyzer_helper") {
|
rtc_library("analyzer_helper") {
|
||||||
@ -726,6 +740,7 @@ if (!build_with_chromium) {
|
|||||||
visibility = [
|
visibility = [
|
||||||
":default_video_quality_analyzer",
|
":default_video_quality_analyzer",
|
||||||
":default_video_quality_analyzer_frames_comparator_test",
|
":default_video_quality_analyzer_frames_comparator_test",
|
||||||
|
":default_video_quality_analyzer_stream_state_test",
|
||||||
":names_collection_test",
|
":names_collection_test",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -193,12 +193,16 @@ uint16_t DefaultVideoQualityAnalyzer::OnFrameCaptured(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::set<size_t> frame_receivers_indexes = peers_->GetPresentIndexes();
|
||||||
|
if (!options_.enable_receive_own_stream) {
|
||||||
|
frame_receivers_indexes.erase(peer_index);
|
||||||
|
}
|
||||||
|
|
||||||
auto state_it = stream_states_.find(stream_index);
|
auto state_it = stream_states_.find(stream_index);
|
||||||
if (state_it == stream_states_.end()) {
|
if (state_it == stream_states_.end()) {
|
||||||
stream_states_.emplace(
|
stream_states_.emplace(
|
||||||
stream_index,
|
stream_index,
|
||||||
StreamState(peer_index, peers_->size(),
|
StreamState(peer_index, frame_receivers_indexes, captured_time));
|
||||||
options_.enable_receive_own_stream, captured_time));
|
|
||||||
}
|
}
|
||||||
StreamState* state = &stream_states_.at(stream_index);
|
StreamState* state = &stream_states_.at(stream_index);
|
||||||
state->PushBack(frame_id);
|
state->PushBack(frame_id);
|
||||||
@ -230,10 +234,6 @@ uint16_t DefaultVideoQualityAnalyzer::OnFrameCaptured(
|
|||||||
|
|
||||||
captured_frames_in_flight_.erase(it);
|
captured_frames_in_flight_.erase(it);
|
||||||
}
|
}
|
||||||
std::set<size_t> frame_receivers_indexes = peers_->GetPresentIndexes();
|
|
||||||
if (!options_.enable_receive_own_stream) {
|
|
||||||
frame_receivers_indexes.erase(peer_index);
|
|
||||||
}
|
|
||||||
captured_frames_in_flight_.emplace(
|
captured_frames_in_flight_.emplace(
|
||||||
frame_id, FrameInFlight(stream_index, frame, captured_time,
|
frame_id, FrameInFlight(stream_index, frame, captured_time,
|
||||||
std::move(frame_receivers_indexes)));
|
std::move(frame_receivers_indexes)));
|
||||||
@ -470,7 +470,7 @@ void DefaultVideoQualityAnalyzer::OnFrameRendered(
|
|||||||
|
|
||||||
const size_t stream_index = frame_in_flight->stream();
|
const size_t stream_index = frame_in_flight->stream();
|
||||||
StreamState* state = &stream_states_.at(stream_index);
|
StreamState* state = &stream_states_.at(stream_index);
|
||||||
const InternalStatsKey stats_key(stream_index, state->owner(), peer_index);
|
const InternalStatsKey stats_key(stream_index, state->sender(), peer_index);
|
||||||
|
|
||||||
// Update frames counters.
|
// Update frames counters.
|
||||||
frame_counters_.rendered++;
|
frame_counters_.rendered++;
|
||||||
@ -498,7 +498,6 @@ void DefaultVideoQualityAnalyzer::OnFrameRendered(
|
|||||||
|
|
||||||
auto dropped_frame_it = captured_frames_in_flight_.find(dropped_frame_id);
|
auto dropped_frame_it = captured_frames_in_flight_.find(dropped_frame_id);
|
||||||
RTC_DCHECK(dropped_frame_it != captured_frames_in_flight_.end());
|
RTC_DCHECK(dropped_frame_it != captured_frames_in_flight_.end());
|
||||||
absl::optional<VideoFrame> dropped_frame = dropped_frame_it->second.frame();
|
|
||||||
dropped_frame_it->second.MarkDropped(peer_index);
|
dropped_frame_it->second.MarkDropped(peer_index);
|
||||||
|
|
||||||
analyzer_stats_.frames_in_flight_left_count.AddSample(
|
analyzer_stats_.frames_in_flight_left_count.AddSample(
|
||||||
@ -591,7 +590,7 @@ void DefaultVideoQualityAnalyzer::RegisterParticipantInCall(
|
|||||||
// Ensure, that frames states are handled correctly
|
// Ensure, that frames states are handled correctly
|
||||||
// (e.g. dropped frames tracking).
|
// (e.g. dropped frames tracking).
|
||||||
for (auto& key_val : stream_states_) {
|
for (auto& key_val : stream_states_) {
|
||||||
key_val.second.AddPeer();
|
key_val.second.AddPeer(new_peer_index);
|
||||||
}
|
}
|
||||||
// Register new peer for every frame in flight.
|
// Register new peer for every frame in flight.
|
||||||
// It is guaranteed, that no garbage FrameInFlight objects will stay in
|
// It is guaranteed, that no garbage FrameInFlight objects will stay in
|
||||||
@ -624,11 +623,11 @@ void DefaultVideoQualityAnalyzer::Stop() {
|
|||||||
const size_t stream_index = state_entry.first;
|
const size_t stream_index = state_entry.first;
|
||||||
StreamState& stream_state = state_entry.second;
|
StreamState& stream_state = state_entry.second;
|
||||||
for (size_t i = 0; i < peers_->size(); ++i) {
|
for (size_t i = 0; i < peers_->size(); ++i) {
|
||||||
if (i == stream_state.owner() && !options_.enable_receive_own_stream) {
|
if (i == stream_state.sender() && !options_.enable_receive_own_stream) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
InternalStatsKey stats_key(stream_index, stream_state.owner(), i);
|
InternalStatsKey stats_key(stream_index, stream_state.sender(), i);
|
||||||
|
|
||||||
// If there are no freezes in the call we have to report
|
// If there are no freezes in the call we have to report
|
||||||
// time_between_freezes_ms as call duration and in such case
|
// time_between_freezes_ms as call duration and in such case
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
|
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
|
||||||
|
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
#include "absl/types/optional.h"
|
#include "absl/types/optional.h"
|
||||||
#include "api/units/timestamp.h"
|
#include "api/units/timestamp.h"
|
||||||
@ -30,26 +31,33 @@ absl::optional<T> MaybeGetValue(const std::map<size_t, T>& map, size_t key) {
|
|||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
StreamState::StreamState(size_t sender,
|
||||||
|
std::set<size_t> receivers,
|
||||||
|
Timestamp stream_started_time)
|
||||||
|
: sender_(sender),
|
||||||
|
stream_started_time_(stream_started_time),
|
||||||
|
receivers_(receivers),
|
||||||
|
frame_ids_(std::move(receivers)) {
|
||||||
|
frame_ids_.AddReader(kAliveFramesQueueIndex);
|
||||||
|
RTC_CHECK_NE(sender_, kAliveFramesQueueIndex);
|
||||||
|
for (size_t receiver : receivers_) {
|
||||||
|
RTC_CHECK_NE(receiver, kAliveFramesQueueIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
uint16_t StreamState::PopFront(size_t peer) {
|
uint16_t StreamState::PopFront(size_t peer) {
|
||||||
size_t peer_queue = GetPeerQueueIndex(peer);
|
RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
|
||||||
size_t alive_frames_queue = GetAliveFramesQueueIndex();
|
absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer);
|
||||||
absl::optional<uint16_t> frame_id = frame_ids_.PopFront(peer_queue);
|
|
||||||
RTC_DCHECK(frame_id.has_value());
|
RTC_DCHECK(frame_id.has_value());
|
||||||
|
|
||||||
// If alive's frame queue is longer than all others, than also pop frame from
|
// If alive's frame queue is longer than all others, than also pop frame from
|
||||||
// it, because that frame is received by all receivers.
|
// it, because that frame is received by all receivers.
|
||||||
size_t alive_size = frame_ids_.size(alive_frames_queue);
|
size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex);
|
||||||
size_t other_size = 0;
|
size_t other_size = GetLongestReceiverQueue();
|
||||||
for (size_t i = 0; i < frame_ids_.readers_count(); ++i) {
|
|
||||||
size_t cur_size = frame_ids_.size(i);
|
|
||||||
if (i != alive_frames_queue && cur_size > other_size) {
|
|
||||||
other_size = cur_size;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Pops frame from alive queue if alive's queue is the longest one.
|
// Pops frame from alive queue if alive's queue is the longest one.
|
||||||
if (alive_size > other_size) {
|
if (alive_size > other_size) {
|
||||||
absl::optional<uint16_t> alive_frame_id =
|
absl::optional<uint16_t> alive_frame_id =
|
||||||
frame_ids_.PopFront(alive_frames_queue);
|
frame_ids_.PopFront(kAliveFramesQueueIndex);
|
||||||
RTC_DCHECK(alive_frame_id.has_value());
|
RTC_DCHECK(alive_frame_id.has_value());
|
||||||
RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
|
RTC_DCHECK_EQ(frame_id.value(), alive_frame_id.value());
|
||||||
}
|
}
|
||||||
@ -57,9 +65,30 @@ uint16_t StreamState::PopFront(size_t peer) {
|
|||||||
return frame_id.value();
|
return frame_id.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void StreamState::AddPeer(size_t peer) {
|
||||||
|
RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
|
||||||
|
frame_ids_.AddReader(peer, kAliveFramesQueueIndex);
|
||||||
|
receivers_.insert(peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
void StreamState::RemovePeer(size_t peer) {
|
||||||
|
RTC_CHECK_NE(peer, kAliveFramesQueueIndex);
|
||||||
|
frame_ids_.RemoveReader(peer);
|
||||||
|
receivers_.erase(peer);
|
||||||
|
|
||||||
|
// If we removed the last receiver for the alive frames, we need to pop them
|
||||||
|
// from the queue, because now they received by all receivers.
|
||||||
|
size_t alive_size = frame_ids_.size(kAliveFramesQueueIndex);
|
||||||
|
size_t other_size = GetLongestReceiverQueue();
|
||||||
|
while (alive_size > other_size) {
|
||||||
|
frame_ids_.PopFront(kAliveFramesQueueIndex);
|
||||||
|
alive_size--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
uint16_t StreamState::MarkNextAliveFrameAsDead() {
|
uint16_t StreamState::MarkNextAliveFrameAsDead() {
|
||||||
absl::optional<uint16_t> frame_id =
|
absl::optional<uint16_t> frame_id =
|
||||||
frame_ids_.PopFront(GetAliveFramesQueueIndex());
|
frame_ids_.PopFront(kAliveFramesQueueIndex);
|
||||||
RTC_DCHECK(frame_id.has_value());
|
RTC_DCHECK(frame_id.has_value());
|
||||||
return frame_id.value();
|
return frame_id.value();
|
||||||
}
|
}
|
||||||
@ -78,27 +107,15 @@ absl::optional<Timestamp> StreamState::last_rendered_frame_time(
|
|||||||
return MaybeGetValue(last_rendered_frame_time_, peer);
|
return MaybeGetValue(last_rendered_frame_time_, peer);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t StreamState::GetPeerQueueIndex(size_t peer_index) const {
|
size_t StreamState::GetLongestReceiverQueue() const {
|
||||||
// When sender isn't expecting to receive its own stream we will use their
|
size_t max = 0;
|
||||||
// queue for tracking alive frames. Otherwise we will use the queue #0 to
|
for (size_t receiver : receivers_) {
|
||||||
// track alive frames and will shift all other queues for peers on 1.
|
size_t cur_size = frame_ids_.size(receiver);
|
||||||
// It means when `enable_receive_own_stream_` is true peer's queue will have
|
if (cur_size > max) {
|
||||||
// index equal to `peer_index` + 1 and when `enable_receive_own_stream_` is
|
max = cur_size;
|
||||||
// false peer's queue will have index equal to `peer_index`.
|
}
|
||||||
if (!enable_receive_own_stream_) {
|
|
||||||
return peer_index;
|
|
||||||
}
|
}
|
||||||
return peer_index + 1;
|
return max;
|
||||||
}
|
|
||||||
|
|
||||||
size_t StreamState::GetAliveFramesQueueIndex() const {
|
|
||||||
// When sender isn't expecting to receive its own stream we will use their
|
|
||||||
// queue for tracking alive frames. Otherwise we will use the queue #0 to
|
|
||||||
// track alive frames and will shift all other queues for peers on 1.
|
|
||||||
if (!enable_receive_own_stream_) {
|
|
||||||
return owner_;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace webrtc
|
} // namespace webrtc
|
||||||
|
@ -11,7 +11,9 @@
|
|||||||
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
|
#ifndef TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
|
||||||
#define TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
|
#define TEST_PC_E2E_ANALYZER_VIDEO_DEFAULT_VIDEO_QUALITY_ANALYZER_STREAM_STATE_H_
|
||||||
|
|
||||||
|
#include <limits>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <set>
|
||||||
|
|
||||||
#include "absl/types/optional.h"
|
#include "absl/types/optional.h"
|
||||||
#include "api/units/timestamp.h"
|
#include "api/units/timestamp.h"
|
||||||
@ -21,39 +23,44 @@ namespace webrtc {
|
|||||||
|
|
||||||
// Represents a current state of video stream inside
|
// Represents a current state of video stream inside
|
||||||
// DefaultVideoQualityAnalyzer.
|
// DefaultVideoQualityAnalyzer.
|
||||||
|
//
|
||||||
|
// Maintains the sequence of frames for each video stream and keeps track about
|
||||||
|
// which frames were seen by each of the possible stream receiver.
|
||||||
|
//
|
||||||
|
// Keeps information about which frames are alive and which are dead. Frame is
|
||||||
|
// alive if it contains VideoFrame payload for corresponding FrameInFlight
|
||||||
|
// object inside DefaultVideoQualityAnalyzer, otherwise frame is considered
|
||||||
|
// dead.
|
||||||
|
//
|
||||||
|
// Supports peer indexes from 0 to max(size_t) - 1.
|
||||||
class StreamState {
|
class StreamState {
|
||||||
public:
|
public:
|
||||||
StreamState(size_t owner,
|
StreamState(size_t sender,
|
||||||
size_t peers_count,
|
std::set<size_t> receivers,
|
||||||
bool enable_receive_own_stream,
|
Timestamp stream_started_time);
|
||||||
Timestamp stream_started_time)
|
|
||||||
: owner_(owner),
|
|
||||||
enable_receive_own_stream_(enable_receive_own_stream),
|
|
||||||
stream_started_time_(stream_started_time),
|
|
||||||
frame_ids_(enable_receive_own_stream ? peers_count + 1 : peers_count) {}
|
|
||||||
|
|
||||||
size_t owner() const { return owner_; }
|
size_t sender() const { return sender_; }
|
||||||
Timestamp stream_started_time() const { return stream_started_time_; }
|
Timestamp stream_started_time() const { return stream_started_time_; }
|
||||||
|
|
||||||
void PushBack(uint16_t frame_id) { frame_ids_.PushBack(frame_id); }
|
void PushBack(uint16_t frame_id) { frame_ids_.PushBack(frame_id); }
|
||||||
// Crash if state is empty. Guarantees that there can be no alive frames
|
|
||||||
// that are not in the owner queue
|
|
||||||
uint16_t PopFront(size_t peer);
|
|
||||||
bool IsEmpty(size_t peer) const {
|
|
||||||
return frame_ids_.IsEmpty(GetPeerQueueIndex(peer));
|
|
||||||
}
|
|
||||||
// Crash if state is empty.
|
// Crash if state is empty.
|
||||||
uint16_t Front(size_t peer) const {
|
uint16_t PopFront(size_t peer);
|
||||||
return frame_ids_.Front(GetPeerQueueIndex(peer)).value();
|
bool IsEmpty(size_t peer) const { return frame_ids_.IsEmpty(peer); }
|
||||||
}
|
// Crash if state is empty.
|
||||||
|
uint16_t Front(size_t peer) const { return frame_ids_.Front(peer).value(); }
|
||||||
|
|
||||||
// When new peer is added - all current alive frames will be sent to it as
|
// Adds a new peer to the state. All currently alive frames will be expected
|
||||||
// well. So we need to register them as expected by copying owner_ head to
|
// to be received by the newly added peer.
|
||||||
// the new head.
|
void AddPeer(size_t peer);
|
||||||
void AddPeer() { frame_ids_.AddReader(GetAliveFramesQueueIndex()); }
|
|
||||||
|
// Removes peer from the state. Frames that were expected to be received by
|
||||||
|
// this peer will be removed from it. On the other hand last rendered frame
|
||||||
|
// time for the removed peer will be preserved, because
|
||||||
|
// DefaultVideoQualityAnalyzer still may request it for stats processing.
|
||||||
|
void RemovePeer(size_t peer);
|
||||||
|
|
||||||
size_t GetAliveFramesCount() const {
|
size_t GetAliveFramesCount() const {
|
||||||
return frame_ids_.size(GetAliveFramesQueueIndex());
|
return frame_ids_.size(kAliveFramesQueueIndex);
|
||||||
}
|
}
|
||||||
uint16_t MarkNextAliveFrameAsDead();
|
uint16_t MarkNextAliveFrameAsDead();
|
||||||
|
|
||||||
@ -61,19 +68,17 @@ class StreamState {
|
|||||||
absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
|
absl::optional<Timestamp> last_rendered_frame_time(size_t peer) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Returns index of the `frame_ids_` queue which is used for specified
|
// Index of the `frame_ids_` queue which is used to track alive frames for
|
||||||
// `peer_index`.
|
// this stream.
|
||||||
size_t GetPeerQueueIndex(size_t peer_index) const;
|
static constexpr size_t kAliveFramesQueueIndex =
|
||||||
|
std::numeric_limits<size_t>::max();
|
||||||
|
|
||||||
// Returns index of the `frame_ids_` queue which is used to track alive
|
size_t GetLongestReceiverQueue() const;
|
||||||
// frames for this stream. The frame is alive if it contains VideoFrame
|
|
||||||
// payload in `captured_frames_in_flight_`.
|
|
||||||
size_t GetAliveFramesQueueIndex() const;
|
|
||||||
|
|
||||||
// Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
|
// Index of the owner. Owner's queue in `frame_ids_` will keep alive frames.
|
||||||
const size_t owner_;
|
const size_t sender_;
|
||||||
const bool enable_receive_own_stream_;
|
|
||||||
const Timestamp stream_started_time_;
|
const Timestamp stream_started_time_;
|
||||||
|
std::set<size_t> receivers_;
|
||||||
// To correctly determine dropped frames we have to know sequence of frames
|
// To correctly determine dropped frames we have to know sequence of frames
|
||||||
// in each stream so we will keep a list of frame ids inside the stream.
|
// in each stream so we will keep a list of frame ids inside the stream.
|
||||||
// This list is represented by multi head queue of frame ids with separate
|
// This list is represented by multi head queue of frame ids with separate
|
||||||
|
@ -0,0 +1,126 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "test/pc/e2e/analyzer/video/default_video_quality_analyzer_stream_state.h"
|
||||||
|
|
||||||
|
#include <set>
|
||||||
|
|
||||||
|
#include "api/units/timestamp.h"
|
||||||
|
#include "test/gtest.h"
|
||||||
|
|
||||||
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
|
||||||
|
TEST(StreamStateTest, PopFrontAndFrontIndependentForEachPeer) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/1), 1);
|
||||||
|
EXPECT_EQ(state.PopFront(/*peer=*/1), 1);
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/1), 2);
|
||||||
|
EXPECT_EQ(state.PopFront(/*peer=*/1), 2);
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/2), 1);
|
||||||
|
EXPECT_EQ(state.PopFront(/*peer=*/2), 1);
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/2), 2);
|
||||||
|
EXPECT_EQ(state.PopFront(/*peer=*/2), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, IsEmpty) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
|
||||||
|
EXPECT_FALSE(state.IsEmpty(/*peer=*/1));
|
||||||
|
|
||||||
|
state.PopFront(/*peer=*/1);
|
||||||
|
|
||||||
|
EXPECT_TRUE(state.IsEmpty(/*peer=*/1));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, PopFrontForOnlyOnePeerDontChangeAliveFramesCount) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
|
||||||
|
|
||||||
|
state.PopFront(/*peer=*/1);
|
||||||
|
state.PopFront(/*peer=*/1);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, PopFrontForAllPeersReducesAliveFramesCount) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
|
||||||
|
|
||||||
|
state.PopFront(/*peer=*/1);
|
||||||
|
state.PopFront(/*peer=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, RemovePeerForLastExpectedReceiverUpdatesAliveFrames) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
state.PopFront(/*peer=*/1);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
|
||||||
|
|
||||||
|
state.RemovePeer(/*peer=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, MarkNextAliveFrameAsDeadDecreseAliveFramesCount) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 2lu);
|
||||||
|
|
||||||
|
state.MarkNextAliveFrameAsDead();
|
||||||
|
|
||||||
|
EXPECT_EQ(state.GetAliveFramesCount(), 1lu);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(StreamStateTest, MarkNextAliveFrameAsDeadDoesntAffectFrontFrameForPeer) {
|
||||||
|
StreamState state(/*sender=*/0,
|
||||||
|
/*receivers=*/std::set<size_t>{1, 2},
|
||||||
|
Timestamp::Seconds(1));
|
||||||
|
state.PushBack(/*frame_id=*/1);
|
||||||
|
state.PushBack(/*frame_id=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/1), 1);
|
||||||
|
|
||||||
|
state.MarkNextAliveFrameAsDead();
|
||||||
|
|
||||||
|
EXPECT_EQ(state.Front(/*peer=*/1), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace webrtc
|
@ -42,14 +42,6 @@ class MultiReaderQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a new reader, initializing its reading position (the reader's head)
|
|
||||||
// equal to the one of `reader_to_copy`. New reader will have name index
|
|
||||||
// equal to the current readers count.
|
|
||||||
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
|
|
||||||
void AddReader(size_t reader_to_copy) {
|
|
||||||
AddReader(heads_.size(), reader_to_copy);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adds a new `reader`, initializing its reading position (the reader's head)
|
// Adds a new `reader`, initializing its reading position (the reader's head)
|
||||||
// equal to the one of `reader_to_copy`.
|
// equal to the one of `reader_to_copy`.
|
||||||
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
|
// Complexity O(MultiReaderQueue::size(reader_to_copy)).
|
||||||
@ -65,6 +57,19 @@ class MultiReaderQueue {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Adds a new `reader`, initializing its reading position equal to first
|
||||||
|
// element in the queue.
|
||||||
|
// Complexity O(MultiReaderQueue::size()).
|
||||||
|
void AddReader(size_t reader) {
|
||||||
|
auto it = heads_.find(reader);
|
||||||
|
RTC_CHECK(it == heads_.end())
|
||||||
|
<< "Reader " << reader << " is already in the queue";
|
||||||
|
heads_[reader] = removed_elements_count_;
|
||||||
|
for (size_t i = 0; i < queue_.size(); ++i) {
|
||||||
|
in_queues_[i]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Removes specified `reader` from the queue.
|
// Removes specified `reader` from the queue.
|
||||||
// Complexity O(MultiReaderQueue::size(reader)).
|
// Complexity O(MultiReaderQueue::size(reader)).
|
||||||
void RemoveReader(size_t reader) {
|
void RemoveReader(size_t reader) {
|
||||||
|
@ -17,92 +17,94 @@ namespace webrtc {
|
|||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
|
TEST(MultiReaderQueueTest, EmptyQueueEmptyForAllHeads) {
|
||||||
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
|
MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
|
||||||
EXPECT_EQ(queue.size(), 0lu);
|
EXPECT_EQ(queue.size(), 0lu);
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
EXPECT_TRUE(queue.IsEmpty(i));
|
EXPECT_TRUE(queue.IsEmpty(/*reader=*/i));
|
||||||
EXPECT_EQ(queue.size(i), 0lu);
|
EXPECT_EQ(queue.size(/*reader=*/i), 0lu);
|
||||||
EXPECT_FALSE(queue.PopFront(i).has_value());
|
EXPECT_FALSE(queue.PopFront(/*reader=*/i).has_value());
|
||||||
EXPECT_FALSE(queue.Front(i).has_value());
|
EXPECT_FALSE(queue.Front(/*reader=*/i).has_value());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
|
TEST(MultiReaderQueueTest, SizeIsEqualForAllHeadsAfterAddOnly) {
|
||||||
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
|
MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
|
||||||
queue.PushBack(1);
|
queue.PushBack(1);
|
||||||
queue.PushBack(2);
|
queue.PushBack(2);
|
||||||
queue.PushBack(3);
|
queue.PushBack(3);
|
||||||
EXPECT_EQ(queue.size(), 3lu);
|
EXPECT_EQ(queue.size(), 3lu);
|
||||||
for (int i = 0; i < 10; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
EXPECT_FALSE(queue.IsEmpty(i));
|
EXPECT_FALSE(queue.IsEmpty(/*reader=*/i));
|
||||||
EXPECT_EQ(queue.size(i), 3lu);
|
EXPECT_EQ(queue.size(/*reader=*/i), 3lu);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
|
TEST(MultiReaderQueueTest, SizeIsCorrectAfterRemoveFromOnlyOneHead) {
|
||||||
MultiReaderQueue<int> queue = MultiReaderQueue<int>(10);
|
MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/10);
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i < 5; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
}
|
}
|
||||||
EXPECT_EQ(queue.size(), 5lu);
|
EXPECT_EQ(queue.size(), 5lu);
|
||||||
// Removing elements from queue #0
|
// Removing elements from queue #0
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i < 5; ++i) {
|
||||||
EXPECT_EQ(queue.size(0), static_cast<size_t>(5 - i));
|
EXPECT_EQ(queue.size(/*reader=*/0), static_cast<size_t>(5 - i));
|
||||||
EXPECT_EQ(queue.PopFront(0), absl::optional<int>(i));
|
EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional<int>(i));
|
||||||
for (int j = 1; j < 10; ++j) {
|
for (int j = 1; j < 10; ++j) {
|
||||||
EXPECT_EQ(queue.size(j), 5lu);
|
EXPECT_EQ(queue.size(/*reader=*/j), 5lu);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EXPECT_EQ(queue.size(0), 0lu);
|
EXPECT_EQ(queue.size(/*reader=*/0), 0lu);
|
||||||
EXPECT_TRUE(queue.IsEmpty(0));
|
EXPECT_TRUE(queue.IsEmpty(/*reader=*/0));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
|
TEST(MultiReaderQueueTest, SingleHeadOneAddOneRemove) {
|
||||||
MultiReaderQueue<int> queue = MultiReaderQueue<int>(1);
|
MultiReaderQueue<int> queue = MultiReaderQueue<int>(/*readers_count=*/1);
|
||||||
queue.PushBack(1);
|
queue.PushBack(1);
|
||||||
EXPECT_EQ(queue.size(), 1lu);
|
EXPECT_EQ(queue.size(), 1lu);
|
||||||
EXPECT_TRUE(queue.Front(0).has_value());
|
EXPECT_TRUE(queue.Front(/*reader=*/0).has_value());
|
||||||
EXPECT_EQ(queue.Front(0).value(), 1);
|
EXPECT_EQ(queue.Front(/*reader=*/0).value(), 1);
|
||||||
absl::optional<int> value = queue.PopFront(0);
|
absl::optional<int> value = queue.PopFront(/*reader=*/0);
|
||||||
EXPECT_TRUE(value.has_value());
|
EXPECT_TRUE(value.has_value());
|
||||||
EXPECT_EQ(value.value(), 1);
|
EXPECT_EQ(value.value(), 1);
|
||||||
EXPECT_EQ(queue.size(), 0lu);
|
EXPECT_EQ(queue.size(), 0lu);
|
||||||
EXPECT_TRUE(queue.IsEmpty(0));
|
EXPECT_TRUE(queue.IsEmpty(/*reader=*/0));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, SingleHead) {
|
TEST(MultiReaderQueueTest, SingleHead) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/1);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
EXPECT_EQ(queue.size(), i + 1);
|
EXPECT_EQ(queue.size(), i + 1);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
EXPECT_EQ(queue.Front(0), absl::optional<size_t>(i));
|
EXPECT_EQ(queue.Front(/*reader=*/0), absl::optional<size_t>(i));
|
||||||
EXPECT_EQ(queue.PopFront(0), absl::optional<size_t>(i));
|
EXPECT_EQ(queue.PopFront(/*reader=*/0), absl::optional<size_t>(i));
|
||||||
EXPECT_EQ(queue.size(), 10 - i - 1);
|
EXPECT_EQ(queue.size(), 10 - i - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
|
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/3);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
EXPECT_EQ(queue.size(), i + 1);
|
EXPECT_EQ(queue.size(), i + 1);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
absl::optional<size_t> value = queue.PopFront(0);
|
absl::optional<size_t> value = queue.PopFront(/*reader=*/0);
|
||||||
EXPECT_EQ(queue.size(), 10lu);
|
EXPECT_EQ(queue.size(), 10lu);
|
||||||
ASSERT_TRUE(value.has_value());
|
ASSERT_TRUE(value.has_value());
|
||||||
EXPECT_EQ(value.value(), i);
|
EXPECT_EQ(value.value(), i);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
absl::optional<size_t> value = queue.PopFront(1);
|
absl::optional<size_t> value = queue.PopFront(/*reader=*/1);
|
||||||
EXPECT_EQ(queue.size(), 10lu);
|
EXPECT_EQ(queue.size(), 10lu);
|
||||||
ASSERT_TRUE(value.has_value());
|
ASSERT_TRUE(value.has_value());
|
||||||
EXPECT_EQ(value.value(), i);
|
EXPECT_EQ(value.value(), i);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
absl::optional<size_t> value = queue.PopFront(2);
|
absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
|
||||||
EXPECT_EQ(queue.size(), 10 - i - 1);
|
EXPECT_EQ(queue.size(), 10 - i - 1);
|
||||||
ASSERT_TRUE(value.has_value());
|
ASSERT_TRUE(value.has_value());
|
||||||
EXPECT_EQ(value.value(), i);
|
EXPECT_EQ(value.value(), i);
|
||||||
@ -110,15 +112,16 @@ TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAllPerHead) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
|
TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(3);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/3);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
EXPECT_EQ(queue.size(), i + 1);
|
EXPECT_EQ(queue.size(), i + 1);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
absl::optional<size_t> value1 = queue.PopFront(0);
|
absl::optional<size_t> value1 = queue.PopFront(/*reader=*/0);
|
||||||
absl::optional<size_t> value2 = queue.PopFront(1);
|
absl::optional<size_t> value2 = queue.PopFront(/*reader=*/1);
|
||||||
absl::optional<size_t> value3 = queue.PopFront(2);
|
absl::optional<size_t> value3 = queue.PopFront(/*reader=*/2);
|
||||||
EXPECT_EQ(queue.size(), 10 - i - 1);
|
EXPECT_EQ(queue.size(), 10 - i - 1);
|
||||||
ASSERT_TRUE(value1.has_value());
|
ASSERT_TRUE(value1.has_value());
|
||||||
ASSERT_TRUE(value2.has_value());
|
ASSERT_TRUE(value2.has_value());
|
||||||
@ -129,40 +132,65 @@ TEST(MultiReaderQueueTest, ThreeHeadsAddAllRemoveAll) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, AddReader) {
|
TEST(MultiReaderQueueTest, AddReaderSeeElementsOnlyFromReaderToCopy) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/2);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
EXPECT_EQ(queue.size(), i + 1);
|
|
||||||
}
|
}
|
||||||
queue.AddReader(0);
|
for (size_t i = 0; i < 5; ++i) {
|
||||||
EXPECT_EQ(queue.readers_count(), 2lu);
|
queue.PopFront(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.AddReader(/*reader=*/2, /*reader_to_copy=*/0);
|
||||||
|
|
||||||
|
EXPECT_EQ(queue.readers_count(), 3lu);
|
||||||
|
for (size_t i = 5; i < 10; ++i) {
|
||||||
|
absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
|
||||||
|
EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1);
|
||||||
|
ASSERT_TRUE(value.has_value());
|
||||||
|
EXPECT_EQ(value.value(), i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(MultiReaderQueueTest, AddReaderWithoutReaderToCopySeeFullQueue) {
|
||||||
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/2);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
absl::optional<size_t> value1 = queue.PopFront(0);
|
queue.PushBack(i);
|
||||||
absl::optional<size_t> value2 = queue.PopFront(1);
|
}
|
||||||
EXPECT_EQ(queue.size(), 10 - i - 1);
|
for (size_t i = 0; i < 5; ++i) {
|
||||||
ASSERT_TRUE(value1.has_value());
|
queue.PopFront(/*reader=*/0);
|
||||||
ASSERT_TRUE(value2.has_value());
|
}
|
||||||
EXPECT_EQ(value1.value(), i);
|
|
||||||
EXPECT_EQ(value2.value(), i);
|
queue.AddReader(/*reader=*/2);
|
||||||
|
|
||||||
|
EXPECT_EQ(queue.readers_count(), 3lu);
|
||||||
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
|
absl::optional<size_t> value = queue.PopFront(/*reader=*/2);
|
||||||
|
EXPECT_EQ(queue.size(/*reader=*/2), 10 - i - 1);
|
||||||
|
ASSERT_TRUE(value.has_value());
|
||||||
|
EXPECT_EQ(value.value(), i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
|
TEST(MultiReaderQueueTest, RemoveReaderWontChangeOthers) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(2);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/2);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
}
|
}
|
||||||
EXPECT_EQ(queue.size(1), 10lu);
|
EXPECT_EQ(queue.size(/*reader=*/1), 10lu);
|
||||||
|
|
||||||
queue.RemoveReader(0);
|
queue.RemoveReader(0);
|
||||||
|
|
||||||
EXPECT_EQ(queue.readers_count(), 1lu);
|
EXPECT_EQ(queue.readers_count(), 1lu);
|
||||||
EXPECT_EQ(queue.size(1), 10lu);
|
EXPECT_EQ(queue.size(/*reader=*/1), 10lu);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
|
TEST(MultiReaderQueueTest, RemoveLastReaderMakesQueueEmpty) {
|
||||||
MultiReaderQueue<size_t> queue = MultiReaderQueue<size_t>(1);
|
MultiReaderQueue<size_t> queue =
|
||||||
|
MultiReaderQueue<size_t>(/*readers_count=*/1);
|
||||||
for (size_t i = 0; i < 10; ++i) {
|
for (size_t i = 0; i < 10; ++i) {
|
||||||
queue.PushBack(i);
|
queue.PushBack(i);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user