From 2671e24eaf34cad26c3dfebc9e793c5127ccbebc Mon Sep 17 00:00:00 2001 From: philipel Date: Mon, 3 Oct 2022 13:59:07 +0200 Subject: [PATCH] In RtpReplayer, make Call, StreamState and RtpFileReader member variables. Bug: webrtc:14508 Change-Id: Icf903adf1a244e527615918689d3a7fd1862810c Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277740 Reviewed-by: Mirko Bonadei Commit-Queue: Philip Eliasson Cr-Commit-Position: refs/heads/main@{#38275} --- rtc_tools/video_replay.cc | 112 +++++++++++++++++++------------------- 1 file changed, 55 insertions(+), 57 deletions(-) diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc index bc9bcd13d1..e288da5c11 100644 --- a/rtc_tools/video_replay.cc +++ b/rtc_tools/video_replay.cc @@ -442,71 +442,65 @@ class RtpReplayer final { worker_thread_(std::make_unique( task_queue_factory_->CreateTaskQueue( "worker_thread", - TaskQueueFactory::Priority::NORMAL))) {} + TaskQueueFactory::Priority::NORMAL))), + rtp_reader_(CreateRtpReader(rtp_dump_path_)) { + rtc::Event event; + worker_thread_->PostTask([&]() { + Call::Config call_config(&event_log_); + call_config.trials = field_trials_.get(); + call_config.task_queue_factory = task_queue_factory_.get(); + call_.reset(Call::Create(call_config)); + + // Creation of the streams must happen inside a task queue because it is + // resued as a worker thread. + if (replay_config_path_.empty()) { + stream_state_ = ConfigureFromFlags(rtp_dump_path_, call_.get()); + } else { + stream_state_ = ConfigureFromFile(replay_config_path_, call_.get()); + } + event.Set(); + }); + event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); + + RTC_CHECK(stream_state_); + RTC_CHECK(rtp_reader_); + } + + ~RtpReplayer() { + // Destruction of streams and the call must happen on the same thread as + // their creation. + rtc::Event event; + worker_thread_->PostTask([&]() { + for (const auto& receive_stream : stream_state_->receive_streams) { + call_->DestroyVideoReceiveStream(receive_stream); + } + for (const auto& flexfec_stream : stream_state_->flexfec_streams) { + call_->DestroyFlexfecReceiveStream(flexfec_stream); + } + call_.reset(); + event.Set(); + }); + event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); + } void Run() { - webrtc::RtcEventLogNull event_log; - Call::Config call_config(&event_log); - call_config.trials = field_trials_.get(); - - rtc::Event sync_event(/*manual_reset=*/false, - /*initially_signalled=*/false); - call_config.task_queue_factory = task_queue_factory_.get(); - - // Creation of the streams must happen inside a task queue because it is - // resued as a worker thread. - std::unique_ptr call; - std::unique_ptr stream_state; + rtc::Event event; worker_thread_->PostTask([&]() { - call.reset(Call::Create(call_config)); - - // Attempt to load the configuration - if (replay_config_path_.empty()) { - stream_state = ConfigureFromFlags(rtp_dump_path_, call.get()); - } else { - stream_state = ConfigureFromFile(replay_config_path_, call.get()); - } - - if (stream_state == nullptr) { - return; - } // Start replaying the provided stream now that it has been configured. // VideoReceiveStreams must be started on the same thread as they were // created on. - for (const auto& receive_stream : stream_state->receive_streams) { + for (const auto& receive_stream : stream_state_->receive_streams) { receive_stream->Start(); } - sync_event.Set(); + event.Set(); }); + event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); - // Attempt to create an RtpReader from the input file. - std::unique_ptr rtp_reader = - CreateRtpReader(rtp_dump_path_); - - // Wait for streams creation. - sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); - - if (stream_state != nullptr && rtp_reader != nullptr) { - ReplayPackets(call.get(), rtp_reader.get()); - } - - // Destruction of streams and the call must happen on the same thread as - // their creation. - worker_thread_->PostTask([&]() { - for (const auto& receive_stream : stream_state->receive_streams) { - call->DestroyVideoReceiveStream(receive_stream); - } - for (const auto& flexfec_stream : stream_state->flexfec_streams) { - call->DestroyFlexfecReceiveStream(flexfec_stream); - } - call.reset(); - sync_event.Set(); - }); - sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); + ReplayPackets(); } private: - void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) { + void ReplayPackets() { int64_t replay_start_ms = -1; int num_packets = 0; std::map unknown_packets; @@ -520,7 +514,7 @@ class RtpReplayer final { } test::RtpPacket packet; - if (!rtp_reader->NextPacket(&packet)) { + if (!rtp_reader_->NextPacket(&packet)) { break; } rtc::CopyOnWriteBuffer packet_buffer(packet.data, packet.length); @@ -541,9 +535,9 @@ class RtpReplayer final { worker_thread_->PostTask([&]() { MediaType media_type = IsRtcpPacket(packet_buffer) ? MediaType::ANY : MediaType::VIDEO; - result = call->Receiver()->DeliverPacket(media_type, - std::move(packet_buffer), - /* packet_time_us */ -1); + result = call_->Receiver()->DeliverPacket(media_type, + std::move(packet_buffer), + /* packet_time_us */ -1); event.Set(); }); event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); @@ -577,10 +571,14 @@ class RtpReplayer final { const std::string replay_config_path_; const std::string rtp_dump_path_; + webrtc::RtcEventLogNull event_log_; std::unique_ptr field_trials_; std::unique_ptr task_queue_factory_; std::unique_ptr worker_thread_; -}; // class RtpReplayer + std::unique_ptr call_; + std::unique_ptr rtp_reader_; + std::unique_ptr stream_state_; +}; void RtpReplay() { RtpReplayer replayer(absl::GetFlag(FLAGS_config_file),