diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc index ddcae6f87a..d17a57e826 100644 --- a/rtc_tools/video_replay.cc +++ b/rtc_tools/video_replay.cc @@ -265,35 +265,64 @@ class RtpReplayer final { const std::string& rtp_dump_path) { std::unique_ptr task_queue_factory = webrtc::CreateDefaultTaskQueueFactory(); + auto worker_thread = task_queue_factory->CreateTaskQueue( + "worker_thread", TaskQueueFactory::Priority::NORMAL); + rtc::Event sync_event(/*manual_reset=*/false, + /*initially_signalled=*/false); webrtc::RtcEventLogNull event_log; Call::Config call_config(&event_log); call_config.task_queue_factory = task_queue_factory.get(); call_config.trials = new FieldTrialBasedConfig(); - std::unique_ptr call(Call::Create(call_config)); + std::unique_ptr call; std::unique_ptr stream_state; - // Attempt to load the configuration - if (replay_config_path.empty()) { - stream_state = ConfigureFromFlags(rtp_dump_path, call.get()); - } else { - stream_state = ConfigureFromFile(replay_config_path, call.get()); - } - if (stream_state == nullptr) { - return; - } + + // Creation of the streams must happen inside a task queue because it is + // resued as a worker thread. + worker_thread->PostTask(ToQueuedTask([&]() { + call.reset(Call::Create(call_config)); + + // Attempt to load the configuration + if (replay_config_path.empty()) { + stream_state = ConfigureFromFlags(rtp_dump_path, call.get()); + } else { + stream_state = ConfigureFromFile(replay_config_path, call.get()); + } + + if (stream_state == nullptr) { + return; + } + // Start replaying the provided stream now that it has been configured. + // VideoReceiveStreams must be started on the same thread as they were + // created on. + for (const auto& receive_stream : stream_state->receive_streams) { + receive_stream->Start(); + } + sync_event.Set(); + })); + // Attempt to create an RtpReader from the input file. std::unique_ptr rtp_reader = CreateRtpReader(rtp_dump_path); - if (rtp_reader == nullptr) { + + // Wait for streams creation. + sync_event.Wait(/*give_up_after_ms=*/10000); + + if (stream_state == nullptr || rtp_reader == nullptr) { return; } - // Start replaying the provided stream now that it has been configured. - for (const auto& receive_stream : stream_state->receive_streams) { - receive_stream->Start(); - } - ReplayPackets(call.get(), rtp_reader.get()); - for (const auto& receive_stream : stream_state->receive_streams) { - call->DestroyVideoReceiveStream(receive_stream); - } + + ReplayPackets(call.get(), rtp_reader.get(), worker_thread.get()); + + // Destruction of streams and the call must happen on the same thread as + // their creation. + worker_thread->PostTask(ToQueuedTask([&]() { + for (const auto& receive_stream : stream_state->receive_streams) { + call->DestroyVideoReceiveStream(receive_stream); + } + call.reset(); + sync_event.Set(); + })); + sync_event.Wait(/*give_up_after_ms=*/10000); } private: @@ -435,10 +464,13 @@ class RtpReplayer final { return rtp_reader; } - static void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) { + static void ReplayPackets(Call* call, + test::RtpFileReader* rtp_reader, + TaskQueueBase* worker_thread) { int64_t replay_start_ms = -1; int num_packets = 0; std::map unknown_packets; + rtc::Event event(/*manual_reset=*/false, /*initially_signalled=*/false); while (true) { int64_t now_ms = rtc::TimeMillis(); if (replay_start_ms == -1) { @@ -456,10 +488,16 @@ class RtpReplayer final { } ++num_packets; - switch (call->Receiver()->DeliverPacket( - webrtc::MediaType::VIDEO, - rtc::CopyOnWriteBuffer(packet.data, packet.length), - /* packet_time_us */ -1)) { + PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK; + worker_thread->PostTask(ToQueuedTask([&]() { + result = call->Receiver()->DeliverPacket( + webrtc::MediaType::VIDEO, + rtc::CopyOnWriteBuffer(packet.data, packet.length), + /* packet_time_us */ -1); + event.Set(); + })); + event.Wait(/*give_up_after_ms=*/10000); + switch (result) { case PacketReceiver::DELIVERY_OK: break; case PacketReceiver::DELIVERY_UNKNOWN_SSRC: {