Start making RtpReplayer into an actual class.

This is to simplify implementation of new feature flags.

 - Move helper functions to anonymous namespace.
 - Add members to avoid passing everything by function paramaters.

Bug: webrtc:14508
Change-Id: I0a4958645a4eb76515f28d8ce868a66be6748919
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277720
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38274}
This commit is contained in:
philipel
2022-10-03 11:56:58 +02:00
committed by WebRTC LUCI CQ
parent c0b0494860
commit 5f55137a34

View File

@ -256,38 +256,215 @@ class DecoderIvfFileWriter : public test::FakeDecoder {
VideoCodecType video_codec_type_;
};
// The RtpReplayer is responsible for parsing the configuration provided by the
// user, setting up the windows, receive streams and decoders and then replaying
// the provided RTP dump.
// Holds all the shared memory structures required for a receive stream. This
// structure is used to prevent members being deallocated before the replay
// has been finished.
struct StreamState {
test::NullTransport transport;
std::vector<std::unique_ptr<rtc::VideoSinkInterface<VideoFrame>>> sinks;
std::vector<VideoReceiveStreamInterface*> receive_streams;
std::vector<FlexfecReceiveStream*> flexfec_streams;
std::unique_ptr<VideoDecoderFactory> decoder_factory;
};
// Loads multiple configurations from the provided configuration file.
std::unique_ptr<StreamState> ConfigureFromFile(const std::string& config_path,
Call* call) {
auto stream_state = std::make_unique<StreamState>();
// Parse the configuration file.
std::ifstream config_file(config_path);
std::stringstream raw_json_buffer;
raw_json_buffer << config_file.rdbuf();
std::string raw_json = raw_json_buffer.str();
Json::CharReaderBuilder builder;
Json::Value json_configs;
std::string error_message;
std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
if (!json_reader->parse(raw_json.data(), raw_json.data() + raw_json.size(),
&json_configs, &error_message)) {
fprintf(stderr, "Error parsing JSON config\n");
fprintf(stderr, "%s\n", error_message.c_str());
return nullptr;
}
stream_state->decoder_factory = std::make_unique<InternalDecoderFactory>();
size_t config_count = 0;
for (const auto& json : json_configs) {
// Create the configuration and parse the JSON into the config.
auto receive_config =
ParseVideoReceiveStreamJsonConfig(&(stream_state->transport), json);
// Instantiate the underlying decoder.
for (auto& decoder : receive_config.decoders) {
decoder = test::CreateMatchingDecoder(decoder.payload_type,
decoder.video_format.name);
}
// Create a window for this config.
std::stringstream window_title;
window_title << "Playback Video (" << config_count++ << ")";
stream_state->sinks.emplace_back(test::VideoRenderer::Create(
window_title.str().c_str(), absl::GetFlag(FLAGS_render_width),
absl::GetFlag(FLAGS_render_height)));
// Create a receive stream for this config.
receive_config.renderer = stream_state->sinks.back().get();
receive_config.decoder_factory = stream_state->decoder_factory.get();
stream_state->receive_streams.emplace_back(
call->CreateVideoReceiveStream(std::move(receive_config)));
}
return stream_state;
}
// Loads the base configuration from flags passed in on the commandline.
std::unique_ptr<StreamState> ConfigureFromFlags(
const std::string& rtp_dump_path,
Call* call) {
auto stream_state = std::make_unique<StreamState>();
// Create the video renderers. We must add both to the stream state to keep
// them from deallocating.
std::stringstream window_title;
window_title << "Playback Video (" << rtp_dump_path << ")";
std::unique_ptr<test::VideoRenderer> playback_video(
test::VideoRenderer::Create(window_title.str().c_str(),
absl::GetFlag(FLAGS_render_width),
absl::GetFlag(FLAGS_render_height)));
auto file_passthrough = std::make_unique<FileRenderPassthrough>(
absl::GetFlag(FLAGS_out_base), playback_video.get());
stream_state->sinks.push_back(std::move(playback_video));
stream_state->sinks.push_back(std::move(file_passthrough));
// Setup the configuration from the flags.
VideoReceiveStreamInterface::Config receive_config(
&(stream_state->transport));
receive_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc);
receive_config.rtp.local_ssrc = kReceiverLocalSsrc;
receive_config.rtp.rtx_ssrc = absl::GetFlag(FLAGS_ssrc_rtx);
receive_config.rtp.rtx_associated_payload_types[absl::GetFlag(
FLAGS_media_payload_type_rtx)] = absl::GetFlag(FLAGS_media_payload_type);
receive_config.rtp
.rtx_associated_payload_types[absl::GetFlag(FLAGS_red_payload_type_rtx)] =
absl::GetFlag(FLAGS_red_payload_type);
receive_config.rtp.ulpfec_payload_type =
absl::GetFlag(FLAGS_ulpfec_payload_type);
receive_config.rtp.red_payload_type = absl::GetFlag(FLAGS_red_payload_type);
receive_config.rtp.nack.rtp_history_ms = 1000;
if (absl::GetFlag(FLAGS_flexfec_payload_type) != -1) {
receive_config.rtp.protected_by_flexfec = true;
webrtc::FlexfecReceiveStream::Config flexfec_config(
&(stream_state->transport));
flexfec_config.payload_type = absl::GetFlag(FLAGS_flexfec_payload_type);
flexfec_config.protected_media_ssrcs.push_back(absl::GetFlag(FLAGS_ssrc));
flexfec_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc_flexfec);
FlexfecReceiveStream* flexfec_stream =
call->CreateFlexfecReceiveStream(flexfec_config);
receive_config.rtp.packet_sink_ = flexfec_stream;
stream_state->flexfec_streams.push_back(flexfec_stream);
}
if (absl::GetFlag(FLAGS_transmission_offset_id) != -1) {
receive_config.rtp.extensions.push_back(
RtpExtension(RtpExtension::kTimestampOffsetUri,
absl::GetFlag(FLAGS_transmission_offset_id)));
}
if (absl::GetFlag(FLAGS_abs_send_time_id) != -1) {
receive_config.rtp.extensions.push_back(RtpExtension(
RtpExtension::kAbsSendTimeUri, absl::GetFlag(FLAGS_abs_send_time_id)));
}
receive_config.renderer = stream_state->sinks.back().get();
// Setup the receiving stream
VideoReceiveStreamInterface::Decoder decoder;
decoder = test::CreateMatchingDecoder(absl::GetFlag(FLAGS_media_payload_type),
absl::GetFlag(FLAGS_codec));
if (!absl::GetFlag(FLAGS_decoder_bitstream_filename).empty()) {
// Replace decoder with file writer if we're writing the bitstream to a
// file instead.
stream_state->decoder_factory =
std::make_unique<test::FunctionVideoDecoderFactory>([]() {
return std::make_unique<DecoderBitstreamFileWriter>(
absl::GetFlag(FLAGS_decoder_bitstream_filename).c_str());
});
} else if (!absl::GetFlag(FLAGS_decoder_ivf_filename).empty()) {
// Replace decoder with file writer if we're writing the ivf to a
// file instead.
stream_state->decoder_factory =
std::make_unique<test::FunctionVideoDecoderFactory>([]() {
return std::make_unique<DecoderIvfFileWriter>(
absl::GetFlag(FLAGS_decoder_ivf_filename).c_str(),
absl::GetFlag(FLAGS_codec));
});
} else {
stream_state->decoder_factory = std::make_unique<InternalDecoderFactory>();
}
receive_config.decoder_factory = stream_state->decoder_factory.get();
receive_config.decoders.push_back(decoder);
stream_state->receive_streams.emplace_back(
call->CreateVideoReceiveStream(std::move(receive_config)));
return stream_state;
}
std::unique_ptr<test::RtpFileReader> CreateRtpReader(
const std::string& rtp_dump_path) {
std::unique_ptr<test::RtpFileReader> rtp_reader(test::RtpFileReader::Create(
test::RtpFileReader::kRtpDump, rtp_dump_path));
if (!rtp_reader) {
rtp_reader.reset(
test::RtpFileReader::Create(test::RtpFileReader::kPcap, rtp_dump_path));
if (!rtp_reader) {
fprintf(stderr,
"Couldn't open input file as either a rtpdump or .pcap. Note "
"that .pcapng is not supported.\nTrying to interpret the file as "
"length/packet interleaved.\n");
rtp_reader.reset(test::RtpFileReader::Create(
test::RtpFileReader::kLengthPacketInterleaved, rtp_dump_path));
if (!rtp_reader) {
fprintf(stderr,
"Unable to open input file with any supported format\n");
return nullptr;
}
}
}
return rtp_reader;
}
// The RtpReplayer is responsible for parsing the configuration provided by
// the user, setting up the windows, receive streams and decoders and then
// replaying the provided RTP dump.
class RtpReplayer final {
public:
// Replay a rtp dump with an optional json configuration.
static void Replay(const std::string& replay_config_path,
const std::string& rtp_dump_path) {
RtpReplayer(absl::string_view replay_config_path,
absl::string_view rtp_dump_path,
std::unique_ptr<FieldTrialsView> field_trials)
: replay_config_path_(replay_config_path),
rtp_dump_path_(rtp_dump_path),
field_trials_(std::move(field_trials)),
task_queue_factory_(
webrtc::CreateDefaultTaskQueueFactory(field_trials_.get())),
worker_thread_(std::make_unique<rtc::TaskQueue>(
task_queue_factory_->CreateTaskQueue(
"worker_thread",
TaskQueueFactory::Priority::NORMAL))) {}
void Run() {
webrtc::RtcEventLogNull event_log;
Call::Config call_config(&event_log);
call_config.trials = new FieldTrialBasedConfig();
call_config.trials = field_trials_.get();
std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory =
webrtc::CreateDefaultTaskQueueFactory(call_config.trials);
auto worker_thread = task_queue_factory->CreateTaskQueue(
"worker_thread", TaskQueueFactory::Priority::NORMAL);
rtc::Event sync_event(/*manual_reset=*/false,
/*initially_signalled=*/false);
call_config.task_queue_factory = task_queue_factory.get();
call_config.task_queue_factory = task_queue_factory_.get();
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
std::unique_ptr<Call> call;
std::unique_ptr<StreamState> stream_state;
worker_thread->PostTask([&]() {
worker_thread_->PostTask([&]() {
call.reset(Call::Create(call_config));
// Attempt to load the configuration
if (replay_config_path.empty()) {
stream_state = ConfigureFromFlags(rtp_dump_path, call.get());
if (replay_config_path_.empty()) {
stream_state = ConfigureFromFlags(rtp_dump_path_, call.get());
} else {
stream_state = ConfigureFromFile(replay_config_path, call.get());
stream_state = ConfigureFromFile(replay_config_path_, call.get());
}
if (stream_state == nullptr) {
@ -304,18 +481,18 @@ class RtpReplayer final {
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
CreateRtpReader(rtp_dump_path);
CreateRtpReader(rtp_dump_path_);
// Wait for streams creation.
sync_event.Wait(/*give_up_after=*/TimeDelta::Seconds(10));
if (stream_state != nullptr && rtp_reader != nullptr) {
ReplayPackets(call.get(), rtp_reader.get(), worker_thread.get());
ReplayPackets(call.get(), rtp_reader.get());
}
// Destruction of streams and the call must happen on the same thread as
// their creation.
worker_thread->PostTask([&]() {
worker_thread_->PostTask([&]() {
for (const auto& receive_stream : stream_state->receive_streams) {
call->DestroyVideoReceiveStream(receive_stream);
}
@ -329,183 +506,7 @@ class RtpReplayer final {
}
private:
// Holds all the shared memory structures required for a receive stream. This
// structure is used to prevent members being deallocated before the replay
// has been finished.
struct StreamState {
test::NullTransport transport;
std::vector<std::unique_ptr<rtc::VideoSinkInterface<VideoFrame>>> sinks;
std::vector<VideoReceiveStreamInterface*> receive_streams;
std::vector<FlexfecReceiveStream*> flexfec_streams;
std::unique_ptr<VideoDecoderFactory> decoder_factory;
};
// Loads multiple configurations from the provided configuration file.
static std::unique_ptr<StreamState> ConfigureFromFile(
const std::string& config_path,
Call* call) {
auto stream_state = std::make_unique<StreamState>();
// Parse the configuration file.
std::ifstream config_file(config_path);
std::stringstream raw_json_buffer;
raw_json_buffer << config_file.rdbuf();
std::string raw_json = raw_json_buffer.str();
Json::CharReaderBuilder builder;
Json::Value json_configs;
std::string error_message;
std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
if (!json_reader->parse(raw_json.data(), raw_json.data() + raw_json.size(),
&json_configs, &error_message)) {
fprintf(stderr, "Error parsing JSON config\n");
fprintf(stderr, "%s\n", error_message.c_str());
return nullptr;
}
stream_state->decoder_factory = std::make_unique<InternalDecoderFactory>();
size_t config_count = 0;
for (const auto& json : json_configs) {
// Create the configuration and parse the JSON into the config.
auto receive_config =
ParseVideoReceiveStreamJsonConfig(&(stream_state->transport), json);
// Instantiate the underlying decoder.
for (auto& decoder : receive_config.decoders) {
decoder = test::CreateMatchingDecoder(decoder.payload_type,
decoder.video_format.name);
}
// Create a window for this config.
std::stringstream window_title;
window_title << "Playback Video (" << config_count++ << ")";
stream_state->sinks.emplace_back(test::VideoRenderer::Create(
window_title.str().c_str(), absl::GetFlag(FLAGS_render_width),
absl::GetFlag(FLAGS_render_height)));
// Create a receive stream for this config.
receive_config.renderer = stream_state->sinks.back().get();
receive_config.decoder_factory = stream_state->decoder_factory.get();
stream_state->receive_streams.emplace_back(
call->CreateVideoReceiveStream(std::move(receive_config)));
}
return stream_state;
}
// Loads the base configuration from flags passed in on the commandline.
static std::unique_ptr<StreamState> ConfigureFromFlags(
const std::string& rtp_dump_path,
Call* call) {
auto stream_state = std::make_unique<StreamState>();
// Create the video renderers. We must add both to the stream state to keep
// them from deallocating.
std::stringstream window_title;
window_title << "Playback Video (" << rtp_dump_path << ")";
std::unique_ptr<test::VideoRenderer> playback_video(
test::VideoRenderer::Create(window_title.str().c_str(),
absl::GetFlag(FLAGS_render_width),
absl::GetFlag(FLAGS_render_height)));
auto file_passthrough = std::make_unique<FileRenderPassthrough>(
absl::GetFlag(FLAGS_out_base), playback_video.get());
stream_state->sinks.push_back(std::move(playback_video));
stream_state->sinks.push_back(std::move(file_passthrough));
// Setup the configuration from the flags.
VideoReceiveStreamInterface::Config receive_config(
&(stream_state->transport));
receive_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc);
receive_config.rtp.local_ssrc = kReceiverLocalSsrc;
receive_config.rtp.rtx_ssrc = absl::GetFlag(FLAGS_ssrc_rtx);
receive_config.rtp.rtx_associated_payload_types[absl::GetFlag(
FLAGS_media_payload_type_rtx)] =
absl::GetFlag(FLAGS_media_payload_type);
receive_config.rtp.rtx_associated_payload_types[absl::GetFlag(
FLAGS_red_payload_type_rtx)] = absl::GetFlag(FLAGS_red_payload_type);
receive_config.rtp.ulpfec_payload_type =
absl::GetFlag(FLAGS_ulpfec_payload_type);
receive_config.rtp.red_payload_type = absl::GetFlag(FLAGS_red_payload_type);
receive_config.rtp.nack.rtp_history_ms = 1000;
if (absl::GetFlag(FLAGS_flexfec_payload_type) != -1) {
receive_config.rtp.protected_by_flexfec = true;
webrtc::FlexfecReceiveStream::Config flexfec_config(
&(stream_state->transport));
flexfec_config.payload_type = absl::GetFlag(FLAGS_flexfec_payload_type);
flexfec_config.protected_media_ssrcs.push_back(absl::GetFlag(FLAGS_ssrc));
flexfec_config.rtp.remote_ssrc = absl::GetFlag(FLAGS_ssrc_flexfec);
FlexfecReceiveStream* flexfec_stream =
call->CreateFlexfecReceiveStream(flexfec_config);
receive_config.rtp.packet_sink_ = flexfec_stream;
stream_state->flexfec_streams.push_back(flexfec_stream);
}
if (absl::GetFlag(FLAGS_transmission_offset_id) != -1) {
receive_config.rtp.extensions.push_back(
RtpExtension(RtpExtension::kTimestampOffsetUri,
absl::GetFlag(FLAGS_transmission_offset_id)));
}
if (absl::GetFlag(FLAGS_abs_send_time_id) != -1) {
receive_config.rtp.extensions.push_back(
RtpExtension(RtpExtension::kAbsSendTimeUri,
absl::GetFlag(FLAGS_abs_send_time_id)));
}
receive_config.renderer = stream_state->sinks.back().get();
// Setup the receiving stream
VideoReceiveStreamInterface::Decoder decoder;
decoder = test::CreateMatchingDecoder(
absl::GetFlag(FLAGS_media_payload_type), absl::GetFlag(FLAGS_codec));
if (!absl::GetFlag(FLAGS_decoder_bitstream_filename).empty()) {
// Replace decoder with file writer if we're writing the bitstream to a
// file instead.
stream_state->decoder_factory =
std::make_unique<test::FunctionVideoDecoderFactory>([]() {
return std::make_unique<DecoderBitstreamFileWriter>(
absl::GetFlag(FLAGS_decoder_bitstream_filename).c_str());
});
} else if (!absl::GetFlag(FLAGS_decoder_ivf_filename).empty()) {
// Replace decoder with file writer if we're writing the ivf to a
// file instead.
stream_state->decoder_factory =
std::make_unique<test::FunctionVideoDecoderFactory>([]() {
return std::make_unique<DecoderIvfFileWriter>(
absl::GetFlag(FLAGS_decoder_ivf_filename).c_str(),
absl::GetFlag(FLAGS_codec));
});
} else {
stream_state->decoder_factory =
std::make_unique<InternalDecoderFactory>();
}
receive_config.decoder_factory = stream_state->decoder_factory.get();
receive_config.decoders.push_back(decoder);
stream_state->receive_streams.emplace_back(
call->CreateVideoReceiveStream(std::move(receive_config)));
return stream_state;
}
static std::unique_ptr<test::RtpFileReader> CreateRtpReader(
const std::string& rtp_dump_path) {
std::unique_ptr<test::RtpFileReader> rtp_reader(test::RtpFileReader::Create(
test::RtpFileReader::kRtpDump, rtp_dump_path));
if (!rtp_reader) {
rtp_reader.reset(test::RtpFileReader::Create(test::RtpFileReader::kPcap,
rtp_dump_path));
if (!rtp_reader) {
fprintf(
stderr,
"Couldn't open input file as either a rtpdump or .pcap. Note "
"that .pcapng is not supported.\nTrying to interpret the file as "
"length/packet interleaved.\n");
rtp_reader.reset(test::RtpFileReader::Create(
test::RtpFileReader::kLengthPacketInterleaved, rtp_dump_path));
if (!rtp_reader) {
fprintf(stderr,
"Unable to open input file with any supported format\n");
return nullptr;
}
}
}
return rtp_reader;
}
static void ReplayPackets(Call* call,
test::RtpFileReader* rtp_reader,
TaskQueueBase* worker_thread) {
void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) {
int64_t replay_start_ms = -1;
int num_packets = 0;
std::map<uint32_t, int> unknown_packets;
@ -537,7 +538,7 @@ class RtpReplayer final {
++num_packets;
PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK;
worker_thread->PostTask([&]() {
worker_thread_->PostTask([&]() {
MediaType media_type =
IsRtcpPacket(packet_buffer) ? MediaType::ANY : MediaType::VIDEO;
result = call->Receiver()->DeliverPacket(media_type,
@ -573,11 +574,19 @@ class RtpReplayer final {
it->second);
}
}
const std::string replay_config_path_;
const std::string rtp_dump_path_;
std::unique_ptr<FieldTrialsView> field_trials_;
std::unique_ptr<TaskQueueFactory> task_queue_factory_;
std::unique_ptr<rtc::TaskQueue> worker_thread_;
}; // class RtpReplayer
void RtpReplay() {
RtpReplayer::Replay(absl::GetFlag(FLAGS_config_file),
absl::GetFlag(FLAGS_input_file));
RtpReplayer replayer(absl::GetFlag(FLAGS_config_file),
absl::GetFlag(FLAGS_input_file),
std::make_unique<FieldTrialBasedConfig>());
replayer.Run();
}
} // namespace