Using TimeController for Scenario test framework
As part of this change, a task queue is used to handle packet processing in real time mode. This requires that we also do most call and media stream related operation on the same task queue to satisfy thread checkers. Bug: webrtc:10365 Change-Id: Icdd9d56e4ca14f2c944dc655c91e29392e3765f7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/127544 Commit-Queue: Sebastian Jansson <srte@webrtc.org> Reviewed-by: Artem Titov <titovartem@webrtc.org> Cr-Commit-Position: refs/heads/master@{#27379}
This commit is contained in:
committed by
Commit Bot
parent
10aac06886
commit
105a10aef0
@ -129,10 +129,12 @@ if (rtc_include_tests) {
|
||||
"../../rtc_base:safe_minmax",
|
||||
"../../rtc_base:sequenced_task_checker",
|
||||
"../../rtc_base:task_queue_for_test",
|
||||
"../../rtc_base/task_utils:repeating_task",
|
||||
"../../system_wrappers",
|
||||
"../../system_wrappers:field_trial",
|
||||
"../../video",
|
||||
"../logging:log_writer",
|
||||
"../time_controller",
|
||||
"network:emulated_network",
|
||||
"//third_party/abseil-cpp/absl/memory",
|
||||
"//third_party/abseil-cpp/absl/types:optional",
|
||||
|
||||
@ -126,24 +126,29 @@ SendAudioStream::SendAudioStream(
|
||||
send_config.track_id,
|
||||
config.encoder.priority_rate->bps<uint32_t>()));
|
||||
}
|
||||
send_stream_ = sender_->call_->CreateAudioSendStream(send_config);
|
||||
if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) {
|
||||
sender->call_->OnAudioTransportOverheadChanged(
|
||||
sender_->transport_.packet_overhead().bytes());
|
||||
}
|
||||
sender_->SendTask([&] {
|
||||
send_stream_ = sender_->call_->CreateAudioSendStream(send_config);
|
||||
if (field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")) {
|
||||
sender->call_->OnAudioTransportOverheadChanged(
|
||||
sender_->transport_->packet_overhead().bytes());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SendAudioStream::~SendAudioStream() {
|
||||
sender_->call_->DestroyAudioSendStream(send_stream_);
|
||||
sender_->SendTask(
|
||||
[this] { sender_->call_->DestroyAudioSendStream(send_stream_); });
|
||||
}
|
||||
|
||||
void SendAudioStream::Start() {
|
||||
send_stream_->Start();
|
||||
sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
|
||||
sender_->SendTask([this] {
|
||||
send_stream_->Start();
|
||||
sender_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
|
||||
});
|
||||
}
|
||||
|
||||
void SendAudioStream::Stop() {
|
||||
send_stream_->Stop();
|
||||
sender_->SendTask([this] { send_stream_->Stop(); });
|
||||
}
|
||||
|
||||
void SendAudioStream::SetMuted(bool mute) {
|
||||
@ -154,8 +159,10 @@ ColumnPrinter SendAudioStream::StatsPrinter() {
|
||||
return ColumnPrinter::Lambda(
|
||||
"audio_target_rate",
|
||||
[this](rtc::SimpleStringBuilder& sb) {
|
||||
AudioSendStream::Stats stats = send_stream_->GetStats();
|
||||
sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0);
|
||||
sender_->SendTask([this, &sb] {
|
||||
AudioSendStream::Stats stats = send_stream_->GetStats();
|
||||
sb.AppendFormat("%.0lf", stats.target_bitrate_bps / 8.0);
|
||||
});
|
||||
},
|
||||
64);
|
||||
}
|
||||
@ -182,19 +189,24 @@ ReceiveAudioStream::ReceiveAudioStream(
|
||||
recv_config.decoder_map = {
|
||||
{CallTest::kAudioSendPayloadType, {"opus", 48000, 2}}};
|
||||
recv_config.sync_group = config.render.sync_group;
|
||||
receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config);
|
||||
receiver_->SendTask([&] {
|
||||
receive_stream_ = receiver_->call_->CreateAudioReceiveStream(recv_config);
|
||||
});
|
||||
}
|
||||
ReceiveAudioStream::~ReceiveAudioStream() {
|
||||
receiver_->call_->DestroyAudioReceiveStream(receive_stream_);
|
||||
receiver_->SendTask(
|
||||
[&] { receiver_->call_->DestroyAudioReceiveStream(receive_stream_); });
|
||||
}
|
||||
|
||||
void ReceiveAudioStream::Start() {
|
||||
receive_stream_->Start();
|
||||
receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
|
||||
receiver_->SendTask([&] {
|
||||
receive_stream_->Start();
|
||||
receiver_->call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
|
||||
});
|
||||
}
|
||||
|
||||
void ReceiveAudioStream::Stop() {
|
||||
receive_stream_->Stop();
|
||||
receiver_->SendTask([&] { receive_stream_->Stop(); });
|
||||
}
|
||||
|
||||
AudioStreamPair::~AudioStreamPair() = default;
|
||||
@ -206,12 +218,12 @@ AudioStreamPair::AudioStreamPair(
|
||||
rtc::scoped_refptr<AudioDecoderFactory> decoder_factory,
|
||||
AudioStreamConfig config)
|
||||
: config_(config),
|
||||
send_stream_(sender, config, encoder_factory, &sender->transport_),
|
||||
send_stream_(sender, config, encoder_factory, sender->transport_.get()),
|
||||
receive_stream_(receiver,
|
||||
config,
|
||||
&send_stream_,
|
||||
decoder_factory,
|
||||
&receiver->transport_) {}
|
||||
receiver->transport_.get()) {}
|
||||
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
||||
@ -30,12 +30,13 @@ const uint32_t kReceiverLocalAudioSsrc = 0x1234567;
|
||||
|
||||
const char* kPriorityStreamId = "priority-track";
|
||||
|
||||
CallClientFakeAudio InitAudio() {
|
||||
CallClientFakeAudio InitAudio(TimeController* time_controller) {
|
||||
CallClientFakeAudio setup;
|
||||
auto capturer = TestAudioDeviceModule::CreatePulsedNoiseCapturer(256, 48000);
|
||||
auto renderer = TestAudioDeviceModule::CreateDiscardRenderer(48000);
|
||||
setup.fake_audio_device = TestAudioDeviceModule::CreateTestAudioDeviceModule(
|
||||
std::move(capturer), std::move(renderer), 1.f);
|
||||
setup.fake_audio_device = TestAudioDeviceModule::Create(
|
||||
time_controller->GetTaskQueueFactory(), std::move(capturer),
|
||||
std::move(renderer), 1.f);
|
||||
setup.apm = AudioProcessingBuilder().Create();
|
||||
setup.fake_audio_device->Init();
|
||||
AudioState::Config audio_state_config;
|
||||
@ -48,30 +49,37 @@ CallClientFakeAudio InitAudio() {
|
||||
return setup;
|
||||
}
|
||||
|
||||
Call* CreateCall(CallClientConfig config,
|
||||
LoggingNetworkControllerFactory* network_controller_factory_,
|
||||
Call* CreateCall(TimeController* time_controller,
|
||||
CallClientConfig config,
|
||||
LoggingNetworkControllerFactory* network_controller_factory,
|
||||
rtc::scoped_refptr<AudioState> audio_state) {
|
||||
CallConfig call_config(network_controller_factory_->GetEventLog());
|
||||
CallConfig call_config(network_controller_factory->GetEventLog());
|
||||
call_config.bitrate_config.max_bitrate_bps =
|
||||
config.transport.rates.max_rate.bps_or(-1);
|
||||
call_config.bitrate_config.min_bitrate_bps =
|
||||
config.transport.rates.min_rate.bps();
|
||||
call_config.bitrate_config.start_bitrate_bps =
|
||||
config.transport.rates.start_rate.bps();
|
||||
call_config.network_controller_factory = network_controller_factory_;
|
||||
call_config.network_controller_factory = network_controller_factory;
|
||||
call_config.audio_state = audio_state;
|
||||
return Call::Create(call_config);
|
||||
return Call::Create(call_config, time_controller->GetClock(),
|
||||
time_controller->CreateProcessThread("CallModules"),
|
||||
time_controller->CreateProcessThread("Pacer"),
|
||||
time_controller->GetTaskQueueFactory());
|
||||
}
|
||||
}
|
||||
|
||||
LoggingNetworkControllerFactory::LoggingNetworkControllerFactory(
|
||||
TimeController* time_controller,
|
||||
LogWriterFactoryInterface* log_writer_factory,
|
||||
TransportControllerConfig config) {
|
||||
TransportControllerConfig config)
|
||||
: time_controller_(time_controller) {
|
||||
std::unique_ptr<RtcEventLogOutput> cc_out;
|
||||
if (!log_writer_factory) {
|
||||
event_log_ = RtcEventLog::CreateNull();
|
||||
} else {
|
||||
event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy);
|
||||
event_log_ = RtcEventLog::Create(RtcEventLog::EncodingType::Legacy,
|
||||
time_controller->GetTaskQueueFactory());
|
||||
bool success = event_log_->StartLogging(
|
||||
log_writer_factory->Create(".rtc.dat"), RtcEventLog::kImmediateOutput);
|
||||
RTC_CHECK(success);
|
||||
@ -118,6 +126,7 @@ LoggingNetworkControllerFactory::LoggingNetworkControllerFactory(
|
||||
}
|
||||
|
||||
LoggingNetworkControllerFactory::~LoggingNetworkControllerFactory() {
|
||||
time_controller_->InvokeWithControlledYield([this]() { event_log_.reset(); });
|
||||
}
|
||||
|
||||
void LoggingNetworkControllerFactory::LogCongestionControllerStats(
|
||||
@ -140,21 +149,33 @@ TimeDelta LoggingNetworkControllerFactory::GetProcessInterval() const {
|
||||
}
|
||||
|
||||
CallClient::CallClient(
|
||||
Clock* clock,
|
||||
TimeController* time_controller,
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
|
||||
CallClientConfig config)
|
||||
: clock_(clock),
|
||||
: time_controller_(time_controller),
|
||||
clock_(time_controller->GetClock()),
|
||||
log_writer_factory_(std::move(log_writer_factory)),
|
||||
network_controller_factory_(log_writer_factory_.get(), config.transport),
|
||||
fake_audio_setup_(InitAudio()),
|
||||
call_(CreateCall(config,
|
||||
&network_controller_factory_,
|
||||
fake_audio_setup_.audio_state)),
|
||||
transport_(clock_, call_.get()),
|
||||
header_parser_(RtpHeaderParser::Create()) {}
|
||||
network_controller_factory_(time_controller,
|
||||
log_writer_factory_.get(),
|
||||
config.transport),
|
||||
header_parser_(RtpHeaderParser::Create()),
|
||||
task_queue_(time_controller->GetTaskQueueFactory()->CreateTaskQueue(
|
||||
"CallClient",
|
||||
TaskQueueFactory::Priority::NORMAL)) {
|
||||
SendTask([this, config] {
|
||||
fake_audio_setup_ = InitAudio(time_controller_);
|
||||
call_.reset(CreateCall(time_controller_, config,
|
||||
&network_controller_factory_,
|
||||
fake_audio_setup_.audio_state));
|
||||
transport_ = absl::make_unique<NetworkNodeTransport>(clock_, call_.get());
|
||||
});
|
||||
}
|
||||
|
||||
CallClient::~CallClient() {
|
||||
delete header_parser_;
|
||||
SendTask([&] {
|
||||
call_.reset();
|
||||
fake_audio_setup_ = {};
|
||||
});
|
||||
}
|
||||
|
||||
ColumnPrinter CallClient::StatsPrinter() {
|
||||
@ -185,8 +206,16 @@ void CallClient::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
RTC_CHECK(ssrc.has_value());
|
||||
media_type = ssrc_media_types_[*ssrc];
|
||||
}
|
||||
call_->Receiver()->DeliverPacket(media_type, packet.data,
|
||||
packet.arrival_time.us());
|
||||
struct Closure {
|
||||
void operator()() {
|
||||
call->Receiver()->DeliverPacket(media_type, packet.data,
|
||||
packet.arrival_time.us());
|
||||
}
|
||||
Call* call;
|
||||
MediaType media_type;
|
||||
EmulatedIpPacket packet;
|
||||
};
|
||||
task_queue_.PostTask(Closure{call_.get(), media_type, std::move(packet)});
|
||||
}
|
||||
|
||||
std::unique_ptr<RtcEventLogOutput> CallClient::GetLogWriter(std::string name) {
|
||||
@ -232,6 +261,11 @@ void CallClient::AddExtensions(std::vector<RtpExtension> extensions) {
|
||||
header_parser_->RegisterRtpHeaderExtension(extension);
|
||||
}
|
||||
|
||||
void CallClient::SendTask(std::function<void()> task) {
|
||||
time_controller_->InvokeWithControlledYield(
|
||||
[&] { task_queue_.SendTask(std::move(task)); });
|
||||
}
|
||||
|
||||
CallClientPair::~CallClientPair() = default;
|
||||
|
||||
} // namespace test
|
||||
|
||||
@ -22,11 +22,13 @@
|
||||
#include "modules/congestion_controller/test/controller_printer.h"
|
||||
#include "modules/rtp_rtcp/include/rtp_header_parser.h"
|
||||
#include "rtc_base/constructor_magic.h"
|
||||
#include "rtc_base/task_queue_for_test.h"
|
||||
#include "test/logging/log_writer.h"
|
||||
#include "test/scenario/column_printer.h"
|
||||
#include "test/scenario/network/network_emulation.h"
|
||||
#include "test/scenario/network_node.h"
|
||||
#include "test/scenario/scenario_config.h"
|
||||
#include "test/time_controller/time_controller.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
@ -34,7 +36,8 @@ namespace test {
|
||||
class LoggingNetworkControllerFactory
|
||||
: public NetworkControllerFactoryInterface {
|
||||
public:
|
||||
LoggingNetworkControllerFactory(LogWriterFactoryInterface* log_writer_factory,
|
||||
LoggingNetworkControllerFactory(TimeController* time_controller,
|
||||
LogWriterFactoryInterface* log_writer_factory,
|
||||
TransportControllerConfig config);
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(LoggingNetworkControllerFactory);
|
||||
~LoggingNetworkControllerFactory();
|
||||
@ -46,6 +49,7 @@ class LoggingNetworkControllerFactory
|
||||
RtcEventLog* GetEventLog() const;
|
||||
|
||||
private:
|
||||
TimeController* time_controller_;
|
||||
std::unique_ptr<RtcEventLog> event_log_;
|
||||
std::unique_ptr<NetworkControllerFactoryInterface> owned_cc_factory_;
|
||||
NetworkControllerFactoryInterface* cc_factory_ = nullptr;
|
||||
@ -62,7 +66,7 @@ struct CallClientFakeAudio {
|
||||
// stream session.
|
||||
class CallClient : public EmulatedNetworkReceiverInterface {
|
||||
public:
|
||||
CallClient(Clock* clock,
|
||||
CallClient(TimeController* time_controller,
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
|
||||
CallClientConfig config);
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(CallClient);
|
||||
@ -94,14 +98,16 @@ class CallClient : public EmulatedNetworkReceiverInterface {
|
||||
uint32_t GetNextRtxSsrc();
|
||||
std::string GetNextPriorityId();
|
||||
void AddExtensions(std::vector<RtpExtension> extensions);
|
||||
void SendTask(std::function<void()> task);
|
||||
|
||||
TimeController* const time_controller_;
|
||||
Clock* clock_;
|
||||
const std::unique_ptr<LogWriterFactoryInterface> log_writer_factory_;
|
||||
LoggingNetworkControllerFactory network_controller_factory_;
|
||||
CallClientFakeAudio fake_audio_setup_;
|
||||
std::unique_ptr<Call> call_;
|
||||
NetworkNodeTransport transport_;
|
||||
RtpHeaderParser* const header_parser_;
|
||||
std::unique_ptr<NetworkNodeTransport> transport_;
|
||||
std::unique_ptr<RtpHeaderParser> const header_parser_;
|
||||
|
||||
std::unique_ptr<FecControllerFactoryInterface> fec_controller_factory_;
|
||||
// Stores the configured overhead per known destination endpoint. This is used
|
||||
@ -114,6 +120,8 @@ class CallClient : public EmulatedNetworkReceiverInterface {
|
||||
int next_audio_local_ssrc_index_ = 0;
|
||||
int next_priority_index_ = 0;
|
||||
std::map<uint32_t, MediaType> ssrc_media_types_;
|
||||
// Defined last so it's destroyed first.
|
||||
TaskQueueForTest task_queue_;
|
||||
};
|
||||
|
||||
class CallClientPair {
|
||||
|
||||
@ -19,6 +19,8 @@
|
||||
#include "test/logging/file_log_writer.h"
|
||||
#include "test/scenario/network/network_emulation.h"
|
||||
#include "test/testsupport/file_utils.h"
|
||||
#include "test/time_controller/real_time_controller.h"
|
||||
#include "test/time_controller/simulated_time_controller.h"
|
||||
|
||||
WEBRTC_DEFINE_bool(scenario_logs, false, "Save logs from scenario framework.");
|
||||
WEBRTC_DEFINE_string(scenario_logs_root,
|
||||
@ -28,7 +30,8 @@ WEBRTC_DEFINE_string(scenario_logs_root,
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
namespace {
|
||||
int64_t kMicrosPerSec = 1000000;
|
||||
const Timestamp kSimulatedStartTime = Timestamp::seconds(100000);
|
||||
|
||||
std::unique_ptr<FileLogWriterFactory> GetScenarioLogManager(
|
||||
std::string file_name) {
|
||||
if (FLAG_scenario_logs && !file_name.empty()) {
|
||||
@ -42,31 +45,14 @@ std::unique_ptr<FileLogWriterFactory> GetScenarioLogManager(
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
RepeatedActivity::RepeatedActivity(TimeDelta interval,
|
||||
std::function<void(TimeDelta)> function)
|
||||
: interval_(interval), function_(function) {}
|
||||
|
||||
void RepeatedActivity::Stop() {
|
||||
interval_ = TimeDelta::PlusInfinity();
|
||||
}
|
||||
|
||||
void RepeatedActivity::Poll(Timestamp time) {
|
||||
RTC_DCHECK(last_update_.IsFinite());
|
||||
if (time >= last_update_ + interval_) {
|
||||
function_(time - last_update_);
|
||||
last_update_ = time;
|
||||
std::unique_ptr<TimeController> CreateTimeController(bool real_time) {
|
||||
if (real_time) {
|
||||
return absl::make_unique<RealTimeController>();
|
||||
} else {
|
||||
return absl::make_unique<GlobalSimulatedTimeController>(
|
||||
kSimulatedStartTime);
|
||||
}
|
||||
}
|
||||
|
||||
void RepeatedActivity::SetStartTime(Timestamp time) {
|
||||
last_update_ = time;
|
||||
}
|
||||
|
||||
Timestamp RepeatedActivity::NextTime() {
|
||||
RTC_DCHECK(last_update_.IsFinite());
|
||||
return last_update_ + interval_;
|
||||
}
|
||||
|
||||
Scenario::Scenario()
|
||||
@ -81,22 +67,17 @@ Scenario::Scenario(
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
|
||||
bool real_time)
|
||||
: log_writer_factory_(std::move(log_writer_factory)),
|
||||
real_time_mode_(real_time),
|
||||
sim_clock_(100000 * kMicrosPerSec),
|
||||
clock_(real_time ? Clock::GetRealTimeClock() : &sim_clock_),
|
||||
time_controller_(CreateTimeController(real_time)),
|
||||
clock_(time_controller_->GetClock()),
|
||||
audio_decoder_factory_(CreateBuiltinAudioDecoderFactory()),
|
||||
audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()) {
|
||||
if (!real_time_mode_ && log_writer_factory_) {
|
||||
rtc::SetClockForTesting(&event_log_fake_clock_);
|
||||
event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() * 1000);
|
||||
}
|
||||
}
|
||||
audio_encoder_factory_(CreateBuiltinAudioEncoderFactory()),
|
||||
task_queue_(time_controller_->GetTaskQueueFactory()->CreateTaskQueue(
|
||||
"Scenario",
|
||||
TaskQueueFactory::Priority::NORMAL)) {}
|
||||
|
||||
Scenario::~Scenario() {
|
||||
if (start_time_.IsFinite())
|
||||
Stop();
|
||||
if (!real_time_mode_)
|
||||
rtc::SetClockForTesting(nullptr);
|
||||
}
|
||||
|
||||
ColumnPrinter Scenario::TimePrinter() {
|
||||
@ -123,9 +104,8 @@ StatesPrinter* Scenario::CreatePrinter(std::string name,
|
||||
}
|
||||
|
||||
CallClient* Scenario::CreateClient(std::string name, CallClientConfig config) {
|
||||
RTC_DCHECK(real_time_mode_);
|
||||
CallClient* client =
|
||||
new CallClient(clock_, GetLogWriterFactory(name), config);
|
||||
new CallClient(time_controller_.get(), GetLogWriterFactory(name), config);
|
||||
if (config.transport.state_log_interval.IsFinite()) {
|
||||
Every(config.transport.state_log_interval, [this, client]() {
|
||||
client->network_controller_factory_.LogCongestionControllerStats(Now());
|
||||
@ -178,7 +158,7 @@ void Scenario::ChangeRoute(std::pair<CallClient*, CallClient*> clients,
|
||||
uint64_t route_id = next_route_id_++;
|
||||
clients.second->route_overhead_.insert({route_id, overhead});
|
||||
EmulatedNetworkNode::CreateRoute(route_id, over_nodes, clients.second);
|
||||
clients.first->transport_.Connect(over_nodes.front(), route_id, overhead);
|
||||
clients.first->transport_->Connect(over_nodes.front(), route_id, overhead);
|
||||
}
|
||||
|
||||
SimulatedTimeClient* Scenario::CreateSimulatedTimeClient(
|
||||
@ -190,17 +170,20 @@ SimulatedTimeClient* Scenario::CreateSimulatedTimeClient(
|
||||
uint64_t send_id = next_route_id_++;
|
||||
uint64_t return_id = next_route_id_++;
|
||||
SimulatedTimeClient* client = new SimulatedTimeClient(
|
||||
GetLogWriterFactory(name), config, stream_configs, send_link, return_link,
|
||||
send_id, return_id, Now());
|
||||
time_controller_.get(), GetLogWriterFactory(name), config, stream_configs,
|
||||
send_link, return_link, send_id, return_id, Now());
|
||||
if (log_writer_factory_ && !name.empty() &&
|
||||
config.transport.state_log_interval.IsFinite()) {
|
||||
Every(config.transport.state_log_interval, [this, client]() {
|
||||
client->network_controller_factory_.LogCongestionControllerStats(Now());
|
||||
});
|
||||
}
|
||||
|
||||
Every(client->GetNetworkControllerProcessInterval(),
|
||||
[this, client] { client->CongestionProcess(Now()); });
|
||||
if (client->GetNetworkControllerProcessInterval().IsFinite()) {
|
||||
Every(client->GetNetworkControllerProcessInterval(),
|
||||
[this, client] { client->CongestionProcess(Now()); });
|
||||
} else {
|
||||
task_queue_.PostTask([this, client] { client->CongestionProcess(Now()); });
|
||||
}
|
||||
Every(TimeDelta::ms(5), [this, client] { client->PacerProcess(Now()); });
|
||||
simulated_time_clients_.emplace_back(client);
|
||||
return client;
|
||||
@ -316,32 +299,34 @@ AudioStreamPair* Scenario::CreateAudioStream(
|
||||
|
||||
void Scenario::Every(TimeDelta interval,
|
||||
std::function<void(TimeDelta)> function) {
|
||||
repeated_activities_.emplace_back(new RepeatedActivity(interval, function));
|
||||
if (start_time_.IsFinite()) {
|
||||
repeated_activities_.back()->SetStartTime(Now());
|
||||
}
|
||||
RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval,
|
||||
[interval, function] {
|
||||
function(interval);
|
||||
return interval;
|
||||
});
|
||||
}
|
||||
|
||||
void Scenario::Every(TimeDelta interval, std::function<void()> function) {
|
||||
auto function_with_argument = [function](TimeDelta) { function(); };
|
||||
repeated_activities_.emplace_back(
|
||||
new RepeatedActivity(interval, function_with_argument));
|
||||
if (start_time_.IsFinite()) {
|
||||
repeated_activities_.back()->SetStartTime(Now());
|
||||
}
|
||||
RepeatingTaskHandle::DelayedStart(task_queue_.Get(), interval,
|
||||
[interval, function] {
|
||||
function();
|
||||
return interval;
|
||||
});
|
||||
}
|
||||
|
||||
void Scenario::At(TimeDelta offset, std::function<void()> function) {
|
||||
pending_activities_.emplace_back(new PendingActivity{offset, function});
|
||||
RTC_DCHECK_GT(offset.ms(), TimeSinceStart().ms());
|
||||
task_queue_.PostDelayedTask(function, TimeUntilTarget(offset).ms());
|
||||
}
|
||||
|
||||
void Scenario::RunFor(TimeDelta duration) {
|
||||
RunUntil(TimeSinceStart() + duration);
|
||||
if (start_time_.IsInfinite())
|
||||
Start();
|
||||
time_controller_->Sleep(duration);
|
||||
}
|
||||
|
||||
void Scenario::RunUntil(TimeDelta target_time_since_start) {
|
||||
RunUntil(target_time_since_start, TimeDelta::PlusInfinity(),
|
||||
[]() { return false; });
|
||||
RunFor(TimeUntilTarget(target_time_since_start));
|
||||
}
|
||||
|
||||
void Scenario::RunUntil(TimeDelta target_time_since_start,
|
||||
@ -349,43 +334,16 @@ void Scenario::RunUntil(TimeDelta target_time_since_start,
|
||||
std::function<bool()> exit_function) {
|
||||
if (start_time_.IsInfinite())
|
||||
Start();
|
||||
|
||||
rtc::Event done_;
|
||||
while (!exit_function() && TimeSinceStart() < target_time_since_start) {
|
||||
Timestamp current_time = Now();
|
||||
TimeDelta duration = current_time - start_time_;
|
||||
Timestamp next_time = current_time + check_interval;
|
||||
for (auto& activity : repeated_activities_) {
|
||||
activity->Poll(current_time);
|
||||
next_time = std::min(next_time, activity->NextTime());
|
||||
}
|
||||
for (auto activity = pending_activities_.begin();
|
||||
activity < pending_activities_.end(); activity++) {
|
||||
if (duration > (*activity)->after_duration) {
|
||||
(*activity)->function();
|
||||
pending_activities_.erase(activity);
|
||||
}
|
||||
}
|
||||
TimeDelta wait_time = next_time - current_time;
|
||||
if (real_time_mode_) {
|
||||
done_.Wait(wait_time.ms<int>());
|
||||
} else {
|
||||
sim_clock_.AdvanceTimeMicroseconds(wait_time.us());
|
||||
// The fake clock is quite slow to update, we only update it if logging is
|
||||
// turned on to save time.
|
||||
if (log_writer_factory_)
|
||||
event_log_fake_clock_.SetTimeNanos(sim_clock_.TimeInMicroseconds() *
|
||||
1000);
|
||||
}
|
||||
while (check_interval >= TimeUntilTarget(target_time_since_start)) {
|
||||
time_controller_->Sleep(check_interval);
|
||||
if (exit_function())
|
||||
return;
|
||||
}
|
||||
time_controller_->Sleep(TimeUntilTarget(target_time_since_start));
|
||||
}
|
||||
|
||||
void Scenario::Start() {
|
||||
start_time_ = Timestamp::us(clock_->TimeInMicroseconds());
|
||||
for (auto& activity : repeated_activities_) {
|
||||
activity->SetStartTime(start_time_);
|
||||
}
|
||||
|
||||
for (auto& stream_pair : video_streams_)
|
||||
stream_pair->receive()->Start();
|
||||
for (auto& stream_pair : audio_streams_)
|
||||
@ -426,5 +384,9 @@ TimeDelta Scenario::TimeSinceStart() {
|
||||
return Now() - start_time_;
|
||||
}
|
||||
|
||||
TimeDelta Scenario::TimeUntilTarget(TimeDelta target_time_offset) {
|
||||
return target_time_offset - TimeSinceStart();
|
||||
}
|
||||
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
#include "absl/memory/memory.h"
|
||||
#include "rtc_base/constructor_magic.h"
|
||||
#include "rtc_base/fake_clock.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "rtc_base/task_utils/repeating_task.h"
|
||||
#include "test/logging/log_writer.h"
|
||||
#include "test/scenario/audio_stream.h"
|
||||
#include "test/scenario/call_client.h"
|
||||
@ -25,33 +27,10 @@
|
||||
#include "test/scenario/scenario_config.h"
|
||||
#include "test/scenario/simulated_time.h"
|
||||
#include "test/scenario/video_stream.h"
|
||||
#include "test/time_controller/time_controller.h"
|
||||
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
// RepeatedActivity is created by the Scenario class and can be used to stop a
|
||||
// running activity at runtime.
|
||||
class RepeatedActivity {
|
||||
public:
|
||||
void Stop();
|
||||
|
||||
private:
|
||||
friend class Scenario;
|
||||
RepeatedActivity(TimeDelta interval, std::function<void(TimeDelta)> function);
|
||||
|
||||
void Poll(Timestamp time);
|
||||
void SetStartTime(Timestamp time);
|
||||
Timestamp NextTime();
|
||||
|
||||
TimeDelta interval_;
|
||||
std::function<void(TimeDelta)> function_;
|
||||
Timestamp last_update_ = Timestamp::MinusInfinity();
|
||||
};
|
||||
|
||||
struct PendingActivity {
|
||||
TimeDelta after_duration;
|
||||
std::function<void()> function;
|
||||
};
|
||||
|
||||
// Scenario is a class owning everything for a test scenario. It creates and
|
||||
// holds network nodes, call clients and media streams. It also provides methods
|
||||
// for changing behavior at runtime. Since it always keeps ownership of the
|
||||
@ -128,11 +107,14 @@ class Scenario {
|
||||
std::vector<EmulatedNetworkNode*> over_nodes,
|
||||
CrossTrafficConfig config);
|
||||
|
||||
// Runs the provided function with a fixed interval.
|
||||
// Runs the provided function with a fixed interval. For real time tests,
|
||||
// |function| starts being called after |interval| from the call to Every().
|
||||
void Every(TimeDelta interval, std::function<void(TimeDelta)> function);
|
||||
void Every(TimeDelta interval, std::function<void()> function);
|
||||
|
||||
// Runs the provided function after given duration has passed in a session.
|
||||
// Runs the provided function after given duration has passed. For real time
|
||||
// tests, |function| is called after |target_time_since_start| from the call
|
||||
// to Every().
|
||||
void At(TimeDelta offset, std::function<void()> function);
|
||||
|
||||
// Sends a packet over the nodes and runs |action| when it has been delivered.
|
||||
@ -140,13 +122,13 @@ class Scenario {
|
||||
size_t packet_size,
|
||||
std::function<void()> action);
|
||||
|
||||
// Runs the scenario for the given time or until the exit function returns
|
||||
// true.
|
||||
// Runs the scenario for the given time.
|
||||
void RunFor(TimeDelta duration);
|
||||
// Runs the scenario until |target_time_since_start|.
|
||||
void RunUntil(TimeDelta target_time_since_start);
|
||||
// Will check |exit_function| every |check_interval|. It stops after a check
|
||||
// if either |target_time_since_start| has passed or if |exit_function|
|
||||
// returns true.
|
||||
// Runs the scenario until |target_time_since_start| or |exit_function|
|
||||
// returns true. |exit_function| is polled after each |check_interval| has
|
||||
// passed.
|
||||
void RunUntil(TimeDelta target_time_since_start,
|
||||
TimeDelta check_interval,
|
||||
std::function<bool()> exit_function);
|
||||
@ -182,14 +164,12 @@ class Scenario {
|
||||
}
|
||||
|
||||
private:
|
||||
TimeDelta TimeUntilTarget(TimeDelta target_time_offset);
|
||||
|
||||
NullReceiver null_receiver_;
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory_;
|
||||
const bool real_time_mode_;
|
||||
SimulatedClock sim_clock_;
|
||||
const std::unique_ptr<LogWriterFactoryInterface> log_writer_factory_;
|
||||
std::unique_ptr<TimeController> time_controller_;
|
||||
Clock* clock_;
|
||||
// Event logs use a global clock instance, this is used to override that
|
||||
// instance when not running in real time.
|
||||
rtc::FakeClock event_log_fake_clock_;
|
||||
|
||||
std::vector<std::unique_ptr<CallClient>> clients_;
|
||||
std::vector<std::unique_ptr<CallClientPair>> client_pairs_;
|
||||
@ -200,9 +180,7 @@ class Scenario {
|
||||
|
||||
std::vector<std::unique_ptr<SimulatedTimeClient>> simulated_time_clients_;
|
||||
|
||||
std::vector<std::unique_ptr<RepeatedActivity>> repeated_activities_;
|
||||
std::vector<std::unique_ptr<ActionReceiver>> action_receivers_;
|
||||
std::vector<std::unique_ptr<PendingActivity>> pending_activities_;
|
||||
std::vector<std::unique_ptr<StatesPrinter>> printers_;
|
||||
|
||||
int64_t next_route_id_ = 40000;
|
||||
@ -210,6 +188,8 @@ class Scenario {
|
||||
rtc::scoped_refptr<AudioEncoderFactory> audio_encoder_factory_;
|
||||
|
||||
Timestamp start_time_ = Timestamp::PlusInfinity();
|
||||
// Defined last so it's destroyed first.
|
||||
rtc::TaskQueue task_queue_;
|
||||
};
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
||||
@ -7,12 +7,15 @@
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#include <atomic>
|
||||
|
||||
#include "test/scenario/scenario.h"
|
||||
#include "test/gtest.h"
|
||||
namespace webrtc {
|
||||
namespace test {
|
||||
TEST(ScenarioTest, StartsAndStopsWithoutErrors) {
|
||||
std::atomic<bool> packet_received(false);
|
||||
std::atomic<bool> bitrate_changed(false);
|
||||
Scenario s;
|
||||
CallClientConfig call_client_config;
|
||||
call_client_config.transport.rates.start_rate = DataRate::kbps(300);
|
||||
@ -38,10 +41,8 @@ TEST(ScenarioTest, StartsAndStopsWithoutErrors) {
|
||||
CrossTrafficConfig cross_traffic_config;
|
||||
s.CreateCrossTraffic({alice_net}, cross_traffic_config);
|
||||
|
||||
bool packet_received = false;
|
||||
s.NetworkDelayedAction({alice_net, bob_net}, 100,
|
||||
[&packet_received] { packet_received = true; });
|
||||
bool bitrate_changed = false;
|
||||
s.Every(TimeDelta::ms(10), [alice, bob, &bitrate_changed] {
|
||||
if (alice->GetStats().send_bandwidth_bps != 300000 &&
|
||||
bob->GetStats().send_bandwidth_bps != 300000)
|
||||
|
||||
@ -245,6 +245,7 @@ void SimulatedFeedback::OnPacketReceived(EmulatedIpPacket packet) {
|
||||
}
|
||||
|
||||
SimulatedTimeClient::SimulatedTimeClient(
|
||||
TimeController* time_controller,
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
|
||||
SimulatedTimeClientConfig config,
|
||||
std::vector<PacketStreamConfig> stream_configs,
|
||||
@ -254,7 +255,9 @@ SimulatedTimeClient::SimulatedTimeClient(
|
||||
uint64_t return_receiver_id,
|
||||
Timestamp at_time)
|
||||
: log_writer_factory_(std::move(log_writer_factory)),
|
||||
network_controller_factory_(log_writer_factory_.get(), config.transport),
|
||||
network_controller_factory_(time_controller,
|
||||
log_writer_factory_.get(),
|
||||
config.transport),
|
||||
send_link_(send_link),
|
||||
return_link_(return_link),
|
||||
sender_(send_link.front(), send_receiver_id),
|
||||
|
||||
@ -121,6 +121,7 @@ class SimulatedSender {
|
||||
class SimulatedTimeClient : EmulatedNetworkReceiverInterface {
|
||||
public:
|
||||
SimulatedTimeClient(
|
||||
TimeController* time_controller,
|
||||
std::unique_ptr<LogWriterFactoryInterface> log_writer_factory,
|
||||
SimulatedTimeClientConfig config,
|
||||
std::vector<PacketStreamConfig> stream_configs,
|
||||
|
||||
@ -339,7 +339,8 @@ SendVideoStream::SendVideoStream(CallClient* sender,
|
||||
: sender_(sender), config_(config) {
|
||||
video_capturer_ = absl::make_unique<FrameGeneratorCapturer>(
|
||||
sender_->clock_, CreateFrameGenerator(sender_->clock_, config.source),
|
||||
config.source.framerate);
|
||||
config.source.framerate,
|
||||
*sender->time_controller_->GetTaskQueueFactory());
|
||||
video_capturer_->Init();
|
||||
|
||||
using Encoder = VideoStreamConfig::Encoder;
|
||||
@ -384,59 +385,66 @@ SendVideoStream::SendVideoStream(CallClient* sender,
|
||||
send_config.encoder_settings.bitrate_allocator_factory =
|
||||
bitrate_allocator_factory_.get();
|
||||
|
||||
send_stream_ = sender_->call_->CreateVideoSendStream(
|
||||
std::move(send_config), std::move(encoder_config));
|
||||
std::vector<std::function<void(const VideoFrameQualityInfo&)> >
|
||||
frame_info_handlers;
|
||||
if (config.analyzer.frame_quality_handler)
|
||||
frame_info_handlers.push_back(config.analyzer.frame_quality_handler);
|
||||
sender_->SendTask([&] {
|
||||
send_stream_ = sender_->call_->CreateVideoSendStream(
|
||||
std::move(send_config), std::move(encoder_config));
|
||||
std::vector<std::function<void(const VideoFrameQualityInfo&)> >
|
||||
frame_info_handlers;
|
||||
if (config.analyzer.frame_quality_handler)
|
||||
frame_info_handlers.push_back(config.analyzer.frame_quality_handler);
|
||||
|
||||
if (analyzer->Active()) {
|
||||
frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer,
|
||||
video_capturer_.get()));
|
||||
send_stream_->SetSource(frame_tap_.get(),
|
||||
config.encoder.degradation_preference);
|
||||
} else {
|
||||
send_stream_->SetSource(video_capturer_.get(),
|
||||
config.encoder.degradation_preference);
|
||||
}
|
||||
if (analyzer->Active()) {
|
||||
frame_tap_.reset(new ForwardingCapturedFrameTap(sender_->clock_, analyzer,
|
||||
video_capturer_.get()));
|
||||
send_stream_->SetSource(frame_tap_.get(),
|
||||
config.encoder.degradation_preference);
|
||||
} else {
|
||||
send_stream_->SetSource(video_capturer_.get(),
|
||||
config.encoder.degradation_preference);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
SendVideoStream::~SendVideoStream() {
|
||||
sender_->call_->DestroyVideoSendStream(send_stream_);
|
||||
sender_->SendTask(
|
||||
[this] { sender_->call_->DestroyVideoSendStream(send_stream_); });
|
||||
}
|
||||
|
||||
void SendVideoStream::Start() {
|
||||
send_stream_->Start();
|
||||
sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
|
||||
sender_->SendTask([this] {
|
||||
send_stream_->Start();
|
||||
sender_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
|
||||
});
|
||||
}
|
||||
|
||||
void SendVideoStream::Stop() {
|
||||
send_stream_->Stop();
|
||||
sender_->SendTask([this] { send_stream_->Stop(); });
|
||||
}
|
||||
|
||||
void SendVideoStream::UpdateConfig(
|
||||
std::function<void(VideoStreamConfig*)> modifier) {
|
||||
rtc::CritScope cs(&crit_);
|
||||
VideoStreamConfig prior_config = config_;
|
||||
modifier(&config_);
|
||||
if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) {
|
||||
for (auto* encoder : fake_encoders_) {
|
||||
encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps());
|
||||
sender_->SendTask([&] {
|
||||
rtc::CritScope cs(&crit_);
|
||||
VideoStreamConfig prior_config = config_;
|
||||
modifier(&config_);
|
||||
if (prior_config.encoder.fake.max_rate != config_.encoder.fake.max_rate) {
|
||||
for (auto* encoder : fake_encoders_) {
|
||||
encoder->SetMaxBitrate(config_.encoder.fake.max_rate.kbps());
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO(srte): Add more conditions that should cause reconfiguration.
|
||||
if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) {
|
||||
VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_);
|
||||
send_stream_->ReconfigureVideoEncoder(std::move(encoder_config));
|
||||
}
|
||||
if (prior_config.source.framerate != config_.source.framerate) {
|
||||
SetCaptureFramerate(config_.source.framerate);
|
||||
}
|
||||
// TODO(srte): Add more conditions that should cause reconfiguration.
|
||||
if (prior_config.encoder.max_framerate != config_.encoder.max_framerate) {
|
||||
VideoEncoderConfig encoder_config = CreateVideoEncoderConfig(config_);
|
||||
send_stream_->ReconfigureVideoEncoder(std::move(encoder_config));
|
||||
}
|
||||
if (prior_config.source.framerate != config_.source.framerate) {
|
||||
SetCaptureFramerate(config_.source.framerate);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void SendVideoStream::SetCaptureFramerate(int framerate) {
|
||||
video_capturer_->ChangeFramerate(framerate);
|
||||
sender_->SendTask([&] { video_capturer_->ChangeFramerate(framerate); });
|
||||
}
|
||||
|
||||
VideoSendStream::Stats SendVideoStream::GetStats() const {
|
||||
@ -508,27 +516,35 @@ ReceiveVideoStream::ReceiveVideoStream(CallClient* receiver,
|
||||
MediaType::VIDEO;
|
||||
if (config.stream.use_rtx)
|
||||
receiver_->ssrc_media_types_[recv_config.rtp.rtx_ssrc] = MediaType::VIDEO;
|
||||
receive_streams_.push_back(
|
||||
receiver_->call_->CreateVideoReceiveStream(std::move(recv_config)));
|
||||
receiver_->SendTask([this, &recv_config] {
|
||||
receive_streams_.push_back(
|
||||
receiver_->call_->CreateVideoReceiveStream(std::move(recv_config)));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
ReceiveVideoStream::~ReceiveVideoStream() {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
receiver_->call_->DestroyVideoReceiveStream(recv_stream);
|
||||
if (flecfec_stream_)
|
||||
receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_);
|
||||
receiver_->SendTask([this] {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
receiver_->call_->DestroyVideoReceiveStream(recv_stream);
|
||||
if (flecfec_stream_)
|
||||
receiver_->call_->DestroyFlexfecReceiveStream(flecfec_stream_);
|
||||
});
|
||||
}
|
||||
|
||||
void ReceiveVideoStream::Start() {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
recv_stream->Start();
|
||||
receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
|
||||
receiver_->SendTask([this] {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
recv_stream->Start();
|
||||
receiver_->call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
|
||||
});
|
||||
}
|
||||
|
||||
void ReceiveVideoStream::Stop() {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
recv_stream->Stop();
|
||||
receiver_->SendTask([this] {
|
||||
for (auto* recv_stream : receive_streams_)
|
||||
recv_stream->Stop();
|
||||
});
|
||||
}
|
||||
|
||||
VideoStreamPair::~VideoStreamPair() = default;
|
||||
@ -541,12 +557,12 @@ VideoStreamPair::VideoStreamPair(
|
||||
: config_(config),
|
||||
analyzer_(std::move(quality_writer),
|
||||
config.analyzer.frame_quality_handler),
|
||||
send_stream_(sender, config, &sender->transport_, &analyzer_),
|
||||
send_stream_(sender, config, sender->transport_.get(), &analyzer_),
|
||||
receive_stream_(receiver,
|
||||
config,
|
||||
&send_stream_,
|
||||
/*chosen_stream=*/0,
|
||||
&receiver->transport_,
|
||||
receiver->transport_.get(),
|
||||
&analyzer_) {}
|
||||
|
||||
} // namespace test
|
||||
|
||||
Reference in New Issue
Block a user