In RtpReplayer, make Call, StreamState and RtpFileReader member variables.

Bug: webrtc:14508
Change-Id: Icf903adf1a244e527615918689d3a7fd1862810c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277740
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38275}
This commit is contained in:
philipel
2022-10-03 13:59:07 +02:00
committed by WebRTC LUCI CQ
parent 5f55137a34
commit 2671e24eaf

View File

@ -442,71 +442,65 @@ class RtpReplayer final {
worker_thread_(std::make_unique<rtc::TaskQueue>(
task_queue_factory_->CreateTaskQueue(
"worker_thread",
TaskQueueFactory::Priority::NORMAL))) {}
TaskQueueFactory::Priority::NORMAL))),
rtp_reader_(CreateRtpReader(rtp_dump_path_)) {
rtc::Event event;
worker_thread_->PostTask([&]() {
Call::Config call_config(&event_log_);
call_config.trials = field_trials_.get();
call_config.task_queue_factory = task_queue_factory_.get();
call_.reset(Call::Create(call_config));
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
if (replay_config_path_.empty()) {
stream_state_ = ConfigureFromFlags(rtp_dump_path_, call_.get());
} else {
stream_state_ = ConfigureFromFile(replay_config_path_, call_.get());
}
event.Set();
});
event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
RTC_CHECK(stream_state_);
RTC_CHECK(rtp_reader_);
}
~RtpReplayer() {
// Destruction of streams and the call must happen on the same thread as
// their creation.
rtc::Event event;
worker_thread_->PostTask([&]() {
for (const auto& receive_stream : stream_state_->receive_streams) {
call_->DestroyVideoReceiveStream(receive_stream);
}
for (const auto& flexfec_stream : stream_state_->flexfec_streams) {
call_->DestroyFlexfecReceiveStream(flexfec_stream);
}
call_.reset();
event.Set();
});
event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
}
void Run() {
webrtc::RtcEventLogNull event_log;
Call::Config call_config(&event_log);
call_config.trials = field_trials_.get();
rtc::Event sync_event(/*manual_reset=*/false,
/*initially_signalled=*/false);
call_config.task_queue_factory = task_queue_factory_.get();
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
std::unique_ptr<Call> call;
std::unique_ptr<StreamState> stream_state;
rtc::Event event;
worker_thread_->PostTask([&]() {
call.reset(Call::Create(call_config));
// Attempt to load the configuration
if (replay_config_path_.empty()) {
stream_state = ConfigureFromFlags(rtp_dump_path_, call.get());
} else {
stream_state = ConfigureFromFile(replay_config_path_, call.get());
}
if (stream_state == nullptr) {
return;
}
// Start replaying the provided stream now that it has been configured.
// VideoReceiveStreams must be started on the same thread as they were
// created on.
for (const auto& receive_stream : stream_state->receive_streams) {
for (const auto& receive_stream : stream_state_->receive_streams) {
receive_stream->Start();
}
sync_event.Set();
event.Set();
});
event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
CreateRtpReader(rtp_dump_path_);
// Wait for streams creation.
sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
if (stream_state != nullptr && rtp_reader != nullptr) {
ReplayPackets(call.get(), rtp_reader.get());
}
// Destruction of streams and the call must happen on the same thread as
// their creation.
worker_thread_->PostTask([&]() {
for (const auto& receive_stream : stream_state->receive_streams) {
call->DestroyVideoReceiveStream(receive_stream);
}
for (const auto& flexfec_stream : stream_state->flexfec_streams) {
call->DestroyFlexfecReceiveStream(flexfec_stream);
}
call.reset();
sync_event.Set();
});
sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
ReplayPackets();
}
private:
void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) {
void ReplayPackets() {
int64_t replay_start_ms = -1;
int num_packets = 0;
std::map<uint32_t, int> unknown_packets;
@ -520,7 +514,7 @@ class RtpReplayer final {
}
test::RtpPacket packet;
if (!rtp_reader->NextPacket(&packet)) {
if (!rtp_reader_->NextPacket(&packet)) {
break;
}
rtc::CopyOnWriteBuffer packet_buffer(packet.data, packet.length);
@ -541,9 +535,9 @@ class RtpReplayer final {
worker_thread_->PostTask([&]() {
MediaType media_type =
IsRtcpPacket(packet_buffer) ? MediaType::ANY : MediaType::VIDEO;
result = call->Receiver()->DeliverPacket(media_type,
std::move(packet_buffer),
/* packet_time_us */ -1);
result = call_->Receiver()->DeliverPacket(media_type,
std::move(packet_buffer),
/* packet_time_us */ -1);
event.Set();
});
event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
@ -577,10 +571,14 @@ class RtpReplayer final {
const std::string replay_config_path_;
const std::string rtp_dump_path_;
webrtc::RtcEventLogNull event_log_;
std::unique_ptr<FieldTrialsView> field_trials_;
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
std::unique_ptr<rtc::TaskQueue> worker_thread_;
}; // class RtpReplayer
std::unique_ptr<Call> call_;
std::unique_ptr<test::RtpFileReader> rtp_reader_;
std::unique_ptr<StreamState> stream_state_;
};
void RtpReplay() {
RtpReplayer replayer(absl::GetFlag(FLAGS_config_file),