Use a task queue for call interaction in video_replay tool

After some recent change current thread while creating the receive stream is
used as a task queue for stats calculation.

Currently, video_replay tool doesn't create streams inside a task queue, so
it ends up posting tasks to a "dead" task queue, which doesn't run message
processing loop at all.

Bug: webrtc:12204
Change-Id: Ieb97a10f44a11e92e2ac08df5b39b7cd695c852e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/196860
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Philipp Hancke <philipp.hancke@googlemail.com>
Commit-Queue: Ilya Nikolaevskiy <ilnik@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32824}
This commit is contained in:
Ilya Nikolaevskiy
2020-12-11 11:07:01 +01:00
committed by Commit Bot
parent 9ff23bad06
commit 6a646905b9

View File

@ -265,35 +265,64 @@ class RtpReplayer final {
const std::string& rtp_dump_path) {
std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory =
webrtc::CreateDefaultTaskQueueFactory();
auto worker_thread = task_queue_factory->CreateTaskQueue(
"worker_thread", TaskQueueFactory::Priority::NORMAL);
rtc::Event sync_event(/*manual_reset=*/false,
/*initially_signalled=*/false);
webrtc::RtcEventLogNull event_log;
Call::Config call_config(&event_log);
call_config.task_queue_factory = task_queue_factory.get();
call_config.trials = new FieldTrialBasedConfig();
std::unique_ptr<Call> call(Call::Create(call_config));
std::unique_ptr<Call> call;
std::unique_ptr<StreamState> stream_state;
// Creation of the streams must happen inside a task queue because it is
// resued as a worker thread.
worker_thread->PostTask(ToQueuedTask([&]() {
call.reset(Call::Create(call_config));
// Attempt to load the configuration
if (replay_config_path.empty()) {
stream_state = ConfigureFromFlags(rtp_dump_path, call.get());
} else {
stream_state = ConfigureFromFile(replay_config_path, call.get());
}
if (stream_state == nullptr) {
return;
}
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
CreateRtpReader(rtp_dump_path);
if (rtp_reader == nullptr) {
return;
}
// Start replaying the provided stream now that it has been configured.
// VideoReceiveStreams must be started on the same thread as they were
// created on.
for (const auto& receive_stream : stream_state->receive_streams) {
receive_stream->Start();
}
ReplayPackets(call.get(), rtp_reader.get());
sync_event.Set();
}));
// Attempt to create an RtpReader from the input file.
std::unique_ptr<test::RtpFileReader> rtp_reader =
CreateRtpReader(rtp_dump_path);
// Wait for streams creation.
sync_event.Wait(/*give_up_after_ms=*/10000);
if (stream_state == nullptr || rtp_reader == nullptr) {
return;
}
ReplayPackets(call.get(), rtp_reader.get(), worker_thread.get());
// Destruction of streams and the call must happen on the same thread as
// their creation.
worker_thread->PostTask(ToQueuedTask([&]() {
for (const auto& receive_stream : stream_state->receive_streams) {
call->DestroyVideoReceiveStream(receive_stream);
}
call.reset();
sync_event.Set();
}));
sync_event.Wait(/*give_up_after_ms=*/10000);
}
private:
@ -435,10 +464,13 @@ class RtpReplayer final {
return rtp_reader;
}
static void ReplayPackets(Call* call, test::RtpFileReader* rtp_reader) {
static void ReplayPackets(Call* call,
test::RtpFileReader* rtp_reader,
TaskQueueBase* worker_thread) {
int64_t replay_start_ms = -1;
int num_packets = 0;
std::map<uint32_t, int> unknown_packets;
rtc::Event event(/*manual_reset=*/false, /*initially_signalled=*/false);
while (true) {
int64_t now_ms = rtc::TimeMillis();
if (replay_start_ms == -1) {
@ -456,10 +488,16 @@ class RtpReplayer final {
}
++num_packets;
switch (call->Receiver()->DeliverPacket(
PacketReceiver::DeliveryStatus result = PacketReceiver::DELIVERY_OK;
worker_thread->PostTask(ToQueuedTask([&]() {
result = call->Receiver()->DeliverPacket(
webrtc::MediaType::VIDEO,
rtc::CopyOnWriteBuffer(packet.data, packet.length),
/* packet_time_us */ -1)) {
/* packet_time_us */ -1);
event.Set();
}));
event.Wait(/*give_up_after_ms=*/10000);
switch (result) {
case PacketReceiver::DELIVERY_OK:
break;
case PacketReceiver::DELIVERY_UNKNOWN_SSRC: {