diff --git a/rtc_tools/video_replay.cc b/rtc_tools/video_replay.cc index 033071e771..bc9bcd13d1 100644 --- a/rtc_tools/video_replay.cc +++ b/rtc_tools/video_replay.cc @@ -256,38 +256,215 @@ class DecoderIvfFileWriter : public test::FakeDecoder { VideoCodecType video_codec_type_; }; -// The RtpReplayer is responsible for parsing the configuration provided by the -// user, setting up the windows, receive streams and decoders and then replaying -// the provided RTP dump. +// Holds all the shared memory structures required for a receive stream. This +// structure is used to prevent members being deallocated before the replay +// has been finished. +struct StreamState { + test::NullTransport transport; + std::vector>> sinks; + std::vector receive_streams; + std::vector flexfec_streams; + std::unique_ptr decoder_factory; +}; + +// Loads multiple configurations from the provided configuration file. +std::unique_ptr ConfigureFromFile(const std::string& config_path, + Call* call) { + auto stream_state = std::make_unique(); + // Parse the configuration file. + std::ifstream config_file(config_path); + std::stringstream raw_json_buffer; + raw_json_buffer << config_file.rdbuf(); + std::string raw_json = raw_json_buffer.str(); + Json::CharReaderBuilder builder; + Json::Value json_configs; + std::string error_message; + std::unique_ptr json_reader(builder.newCharReader()); + if (!json_reader->parse(raw_json.data(), raw_json.data() + raw_json.size(), + &json_configs, &error_message)) { + fprintf(stderr, "Error parsing JSON config\n"); + fprintf(stderr, "%s\n", error_message.c_str()); + return nullptr; + } + + stream_state->decoder_factory = std::make_unique(); + size_t config_count = 0; + for (const auto& json : json_configs) { + // Create the configuration and parse the JSON into the config. + auto receive_config = + ParseVideoReceiveStreamJsonConfig(&(stream_state->transport), json); + // Instantiate the underlying decoder. + for (auto& decoder : receive_config.decoders) { + decoder = test::CreateMatchingDecoder(decoder.payload_type, + decoder.video_format.name); + } + // Create a window for this config. + std::stringstream window_title; + window_title << "Playback Video (" << config_count++ << ")"; + stream_state->sinks.emplace_back(test::VideoRenderer::Create( + window_title.str().c_str(), absl::GetFlag(FLAGS_render_width), + absl::GetFlag(FLAGS_render_height))); + // Create a receive stream for this config. + receive_config.renderer = stream_state->sinks.back().get(); + receive_config.decoder_factory = stream_state->decoder_factory.get(); + stream_state->receive_streams.emplace_back( + call->CreateVideoReceiveStream(std::move(receive_config))); + } + return stream_state; +} + +// Loads the base configuration from flags passed in on the commandline. +std::unique_ptr ConfigureFromFlags( + const std::string& rtp_dump_path, + Call* call) { + auto stream_state = std::make_unique(); + // Create the video renderers. We must add both to the stream state to keep + // them from deallocating. + std::stringstream window_title; + window_title << "Playback Video (" << rtp_dump_path << ")"; + std::unique_ptr playback_video( + test::VideoRenderer::Create(window_title.str().c_str(), + absl::GetFlag(FLAGS_render_width), + absl::GetFlag(FLAGS_render_height))); + auto file_passthrough = std::make_unique( + absl::GetFlag(FLAGS_out_base), playback_video.get()); + stream_state->sinks.push_back(std::move(playback_video)); + stream_state->sinks.push_back(std::move(file_passthrough)); + // Setup the configuration from the flags. + VideoReceiveStreamInterface::Config receive_config( + &(stream_state->transport)); + receive_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc); + receive_config.rtp.local_ssrc = kReceiverLocalSsrc; + receive_config.rtp.rtx_ssrc = absl::GetFlag(FLAGS_ssrc_rtx); + receive_config.rtp.rtx_associated_payload_types[absl::GetFlag( + FLAGS_media_payload_type_rtx)] = absl::GetFlag(FLAGS_media_payload_type); + receive_config.rtp + .rtx_associated_payload_types[absl::GetFlag(FLAGS_red_payload_type_rtx)] = + absl::GetFlag(FLAGS_red_payload_type); + receive_config.rtp.ulpfec_payload_type = + absl::GetFlag(FLAGS_ulpfec_payload_type); + receive_config.rtp.red_payload_type = absl::GetFlag(FLAGS_red_payload_type); + receive_config.rtp.nack.rtp_history_ms = 1000; + + if (absl::GetFlag(FLAGS_flexfec_payload_type) != -1) { + receive_config.rtp.protected_by_flexfec = true; + webrtc::FlexfecReceiveStream::Config flexfec_config( + &(stream_state->transport)); + flexfec_config.payload_type = absl::GetFlag(FLAGS_flexfec_payload_type); + flexfec_config.protected_media_ssrcs.push_back(absl::GetFlag(FLAGS_ssrc)); + flexfec_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc_flexfec); + FlexfecReceiveStream* flexfec_stream = + call->CreateFlexfecReceiveStream(flexfec_config); + receive_config.rtp.packet_sink_ = flexfec_stream; + stream_state->flexfec_streams.push_back(flexfec_stream); + } + + if (absl::GetFlag(FLAGS_transmission_offset_id) != -1) { + receive_config.rtp.extensions.push_back( + RtpExtension(RtpExtension::kTimestampOffsetUri, + absl::GetFlag(FLAGS_transmission_offset_id))); + } + if (absl::GetFlag(FLAGS_abs_send_time_id) != -1) { + receive_config.rtp.extensions.push_back(RtpExtension( + RtpExtension::kAbsSendTimeUri, absl::GetFlag(FLAGS_abs_send_time_id))); + } + receive_config.renderer = stream_state->sinks.back().get(); + + // Setup the receiving stream + VideoReceiveStreamInterface::Decoder decoder; + decoder = test::CreateMatchingDecoder(absl::GetFlag(FLAGS_media_payload_type), + absl::GetFlag(FLAGS_codec)); + if (!absl::GetFlag(FLAGS_decoder_bitstream_filename).empty()) { + // Replace decoder with file writer if we're writing the bitstream to a + // file instead. + stream_state->decoder_factory = + std::make_unique([]() { + return std::make_unique( + absl::GetFlag(FLAGS_decoder_bitstream_filename).c_str()); + }); + } else if (!absl::GetFlag(FLAGS_decoder_ivf_filename).empty()) { + // Replace decoder with file writer if we're writing the ivf to a + // file instead. + stream_state->decoder_factory = + std::make_unique([]() { + return std::make_unique( + absl::GetFlag(FLAGS_decoder_ivf_filename).c_str(), + absl::GetFlag(FLAGS_codec)); + }); + } else { + stream_state->decoder_factory = std::make_unique(); + } + receive_config.decoder_factory = stream_state->decoder_factory.get(); + receive_config.decoders.push_back(decoder); + + stream_state->receive_streams.emplace_back( + call->CreateVideoReceiveStream(std::move(receive_config))); + return stream_state; +} + +std::unique_ptr CreateRtpReader( + const std::string& rtp_dump_path) { + std::unique_ptr rtp_reader(test::RtpFileReader::Create( + test::RtpFileReader::kRtpDump, rtp_dump_path)); + if (!rtp_reader) { + rtp_reader.reset( + test::RtpFileReader::Create(test::RtpFileReader::kPcap, rtp_dump_path)); + if (!rtp_reader) { + fprintf(stderr, + "Couldn't open input file as either a rtpdump or .pcap. Note " + "that .pcapng is not supported.\nTrying to interpret the file as " + "length/packet interleaved.\n"); + rtp_reader.reset(test::RtpFileReader::Create( + test::RtpFileReader::kLengthPacketInterleaved, rtp_dump_path)); + if (!rtp_reader) { + fprintf(stderr, + "Unable to open input file with any supported format\n"); + return nullptr; + } + } + } + return rtp_reader; +} + +// The RtpReplayer is responsible for parsing the configuration provided by +// the user, setting up the windows, receive streams and decoders and then +// replaying the provided RTP dump. class RtpReplayer final { public: - // Replay a rtp dump with an optional json configuration. - static void Replay(const std::string& replay_config_path, - const std::string& rtp_dump_path) { + RtpReplayer(absl::string_view replay_config_path, + absl::string_view rtp_dump_path, + std::unique_ptr field_trials) + : replay_config_path_(replay_config_path), + rtp_dump_path_(rtp_dump_path), + field_trials_(std::move(field_trials)), + task_queue_factory_( + webrtc::CreateDefaultTaskQueueFactory(field_trials_.get())), + worker_thread_(std::make_unique( + task_queue_factory_->CreateTaskQueue( + "worker_thread", + TaskQueueFactory::Priority::NORMAL))) {} + + void Run() { webrtc::RtcEventLogNull event_log; Call::Config call_config(&event_log); - call_config.trials = new FieldTrialBasedConfig(); + call_config.trials = field_trials_.get(); - std::unique_ptr task_queue_factory = - webrtc::CreateDefaultTaskQueueFactory(call_config.trials); - auto worker_thread = task_queue_factory->CreateTaskQueue( - "worker_thread", TaskQueueFactory::Priority::NORMAL); rtc::Event sync_event(/*manual_reset=*/false, /*initially_signalled=*/false); - call_config.task_queue_factory = task_queue_factory.get(); + call_config.task_queue_factory = task_queue_factory_.get(); // Creation of the streams must happen inside a task queue because it is // resued as a worker thread. std::unique_ptr call; std::unique_ptr stream_state; - worker_thread->PostTask([&]() { + worker_thread_->PostTask([&]() { call.reset(Call::Create(call_config)); // Attempt to load the configuration - if (replay_config_path.empty()) { - stream_state = ConfigureFromFlags(rtp_dump_path, call.get()); + if (replay_config_path_.empty()) { + stream_state = ConfigureFromFlags(rtp_dump_path_, call.get()); } else { - stream_state = ConfigureFromFile(replay_config_path, call.get()); + stream_state = ConfigureFromFile(replay_config_path_, call.get()); } if (stream_state == nullptr) { @@ -304,18 +481,18 @@ class RtpReplayer final { // Attempt to create an RtpReader from the input file. std::unique_ptr rtp_reader = - CreateRtpReader(rtp_dump_path); + CreateRtpReader(rtp_dump_path_); // Wait for streams creation. sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10)); if (stream_state != nullptr && rtp_reader != nullptr) { - ReplayPackets(call.get(), rtp_reader.get(), worker_thread.get()); + ReplayPackets(call.get(), rtp_reader.get()); } // Destruction of streams and the call must happen on the same thread as // their creation. - worker_thread->PostTask([&]() { + worker_thread_->PostTask([&]() { for (const auto& receive_stream : stream_state->receive_streams) { call->DestroyVideoReceiveStream(receive_stream); } @@ -329,183 +506,7 @@ class RtpReplayer final { } private: - // Holds all the shared memory structures required for a receive stream. This - // structure is used to prevent members being deallocated before the replay - // has been finished. - struct StreamState { - test::NullTransport transport; - std::vector>> sinks; - std::vector receive_streams; - std::vector flexfec_streams; - std::unique_ptr decoder_factory; - }; - - // Loads multiple configurations from the provided configuration file. - static std::unique_ptr ConfigureFromFile( - const std::string& config_path, - Call* call) { - auto stream_state = std::make_unique(); - // Parse the configuration file. - std::ifstream config_file(config_path); - std::stringstream raw_json_buffer; - raw_json_buffer << config_file.rdbuf(); - std::string raw_json = raw_json_buffer.str(); - Json::CharReaderBuilder builder; - Json::Value json_configs; - std::string error_message; - std::unique_ptr json_reader(builder.newCharReader()); - if (!json_reader->parse(raw_json.data(), raw_json.data() + raw_json.size(), - &json_configs, &error_message)) { - fprintf(stderr, "Error parsing JSON config\n"); - fprintf(stderr, "%s\n", error_message.c_str()); - return nullptr; - } - - stream_state->decoder_factory = std::make_unique(); - size_t config_count = 0; - for (const auto& json : json_configs) { - // Create the configuration and parse the JSON into the config. - auto receive_config = - ParseVideoReceiveStreamJsonConfig(&(stream_state->transport), json); - // Instantiate the underlying decoder. - for (auto& decoder : receive_config.decoders) { - decoder = test::CreateMatchingDecoder(decoder.payload_type, - decoder.video_format.name); - } - // Create a window for this config. - std::stringstream window_title; - window_title << "Playback Video (" << config_count++ << ")"; - stream_state->sinks.emplace_back(test::VideoRenderer::Create( - window_title.str().c_str(), absl::GetFlag(FLAGS_render_width), - absl::GetFlag(FLAGS_render_height))); - // Create a receive stream for this config. - receive_config.renderer = stream_state->sinks.back().get(); - receive_config.decoder_factory = stream_state->decoder_factory.get(); - stream_state->receive_streams.emplace_back( - call->CreateVideoReceiveStream(std::move(receive_config))); - } - return stream_state; - } - - // Loads the base configuration from flags passed in on the commandline. - static std::unique_ptr ConfigureFromFlags( - const std::string& rtp_dump_path, - Call* call) { - auto stream_state = std::make_unique(); - // Create the video renderers. We must add both to the stream state to keep - // them from deallocating. - std::stringstream window_title; - window_title << "Playback Video (" << rtp_dump_path << ")"; - std::unique_ptr playback_video( - test::VideoRenderer::Create(window_title.str().c_str(), - absl::GetFlag(FLAGS_render_width), - absl::GetFlag(FLAGS_render_height))); - auto file_passthrough = std::make_unique( - absl::GetFlag(FLAGS_out_base), playback_video.get()); - stream_state->sinks.push_back(std::move(playback_video)); - stream_state->sinks.push_back(std::move(file_passthrough)); - // Setup the configuration from the flags. - VideoReceiveStreamInterface::Config receive_config( - &(stream_state->transport)); - receive_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc); - receive_config.rtp.local_ssrc = kReceiverLocalSsrc; - receive_config.rtp.rtx_ssrc = absl::GetFlag(FLAGS_ssrc_rtx); - receive_config.rtp.rtx_associated_payload_types[absl::GetFlag( - FLAGS_media_payload_type_rtx)] = - absl::GetFlag(FLAGS_media_payload_type); - receive_config.rtp.rtx_associated_payload_types[absl::GetFlag( - FLAGS_red_payload_type_rtx)] = absl::GetFlag(FLAGS_red_payload_type); - receive_config.rtp.ulpfec_payload_type = - absl::GetFlag(FLAGS_ulpfec_payload_type); - receive_config.rtp.red_payload_type = absl::GetFlag(FLAGS_red_payload_type); - receive_config.rtp.nack.rtp_history_ms = 1000; - - if (absl::GetFlag(FLAGS_flexfec_payload_type) != -1) { - receive_config.rtp.protected_by_flexfec = true; - webrtc::FlexfecReceiveStream::Config flexfec_config( - &(stream_state->transport)); - flexfec_config.payload_type = absl::GetFlag(FLAGS_flexfec_payload_type); - flexfec_config.protected_media_ssrcs.push_back(absl::GetFlag(FLAGS_ssrc)); - flexfec_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc_flexfec); - FlexfecReceiveStream* flexfec_stream = - call->CreateFlexfecReceiveStream(flexfec_config); - receive_config.rtp.packet_sink_ = flexfec_stream; - stream_state->flexfec_streams.push_back(flexfec_stream); - } - - if (absl::GetFlag(FLAGS_transmission_offset_id) != -1) { - receive_config.rtp.extensions.push_back( - RtpExtension(RtpExtension::kTimestampOffsetUri, - absl::GetFlag(FLAGS_transmission_offset_id))); - } - if (absl::GetFlag(FLAGS_abs_send_time_id) != -1) { - receive_config.rtp.extensions.push_back( - RtpExtension(RtpExtension::kAbsSendTimeUri, - absl::GetFlag(FLAGS_abs_send_time_id))); - } - receive_config.renderer = stream_state->sinks.back().get(); - - // Setup the receiving stream - VideoReceiveStreamInterface::Decoder decoder; - decoder = test::CreateMatchingDecoder( - absl::GetFlag(FLAGS_media_payload_type), absl::GetFlag(FLAGS_codec)); - if (!absl::GetFlag(FLAGS_decoder_bitstream_filename).empty()) { - // Replace decoder with file writer if we're writing the bitstream to a - // file instead. - stream_state->decoder_factory = - std::make_unique([]() { - return std::make_unique( - absl::GetFlag(FLAGS_decoder_bitstream_filename).c_str()); - }); - } else if (!absl::GetFlag(FLAGS_decoder_ivf_filename).empty()) { - // Replace decoder with file writer if we're writing the ivf to a - // file instead. - stream_state->decoder_factory = - std::make_unique([]() { - return std::make_unique( - absl::GetFlag(FLAGS_decoder_ivf_filename).c_str(), - absl::GetFlag(FLAGS_codec)); - }); - } else { - stream_state->decoder_factory = - std::make_unique(); - } - receive_config.decoder_factory = stream_state->decoder_factory.get(); - receive_config.decoders.push_back(decoder); - - stream_state->receive_streams.emplace_back( - call->CreateVideoReceiveStream(std::move(receive_config))); - return stream_state; - } - - static std::unique_ptr CreateRtpReader( - const std::string& rtp_dump_path) { - std::unique_ptr rtp_reader(test::RtpFileReader::Create( - test::RtpFileReader::kRtpDump, rtp_dump_path)); - if (!rtp_reader) { - rtp_reader.reset(test::RtpFileReader::Create(test::RtpFileReader::kPcap, - rtp_dump_path)); - if (!rtp_reader) { - fprintf( - stderr, - "Couldn't open input file as either a rtpdump or .pcap. Note " - "that .pcapng is not supported.\nTrying to interpret the file as " - "length/packet interleaved.\n"); - rtp_reader.reset(test::RtpFileReader::Create( - test::RtpFileReader::kLengthPacketInterleaved, rtp_dump_path)); - if (!rtp_reader) { - fprintf(stderr, - "Unable to open input file with any supported format\n"); - return nullptr; - } - } - } - return rtp_reader; - } - - static void ReplayPackets(Call* call, - test::RtpFileReader* rtp_reader, - TaskQueueBase* worker_thread) { + void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) { int64_t replay_start_ms = -1; int num_packets = 0; std::map unknown_packets; @@ -537,7 +538,7 @@ class RtpReplayer final { ++num_packets; PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK; - worker_thread->PostTask([&]() { + worker_thread_->PostTask([&]() { MediaType media_type = IsRtcpPacket(packet_buffer) ? MediaType::ANY : MediaType::VIDEO; result = call->Receiver()->DeliverPacket(media_type, @@ -573,11 +574,19 @@ class RtpReplayer final { it->second); } } + + const std::string replay_config_path_; + const std::string rtp_dump_path_; + std::unique_ptr field_trials_; + std::unique_ptr task_queue_factory_; + std::unique_ptr worker_thread_; }; // class RtpReplayer void RtpReplay() { - RtpReplayer::Replay(absl::GetFlag(FLAGS_config_file), - absl::GetFlag(FLAGS_input_file)); + RtpReplayer replayer(absl::GetFlag(FLAGS_config_file), + absl::GetFlag(FLAGS_input_file), + std::make_unique()); + replayer.Run(); } } // namespace