diff --git a/test/scenario/network/BUILD.gn b/test/scenario/network/BUILD.gn index 0bcd65d999..1f690c5994 100644 --- a/test/scenario/network/BUILD.gn +++ b/test/scenario/network/BUILD.gn @@ -46,6 +46,7 @@ rtc_source_set("emulated_network") { "../../../rtc_base:rtc_task_queue", "../../../rtc_base:safe_minmax", "../../../rtc_base:task_queue_for_test", + "../../../rtc_base/synchronization:sequence_checker", "../../../rtc_base/task_utils:repeating_task", "../../../rtc_base/third_party/sigslot", "../../../system_wrappers", diff --git a/test/scenario/network/cross_traffic.cc b/test/scenario/network/cross_traffic.cc index bd63a08c1a..1d4efef866 100644 --- a/test/scenario/network/cross_traffic.cc +++ b/test/scenario/network/cross_traffic.cc @@ -25,10 +25,13 @@ RandomWalkCrossTraffic::RandomWalkCrossTraffic(RandomWalkConfig config, TrafficRoute* traffic_route) : config_(config), traffic_route_(traffic_route), - random_(config_.random_seed) {} + random_(config_.random_seed) { + sequence_checker_.Detach(); +} RandomWalkCrossTraffic::~RandomWalkCrossTraffic() = default; void RandomWalkCrossTraffic::Process(Timestamp at_time) { + RTC_DCHECK_RUN_ON(&sequence_checker_); if (last_process_time_.IsMinusInfinity()) { last_process_time_ = at_time; } @@ -52,6 +55,7 @@ void RandomWalkCrossTraffic::Process(Timestamp at_time) { } DataRate RandomWalkCrossTraffic::TrafficRate() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); return config_.peak_rate * intensity_; } @@ -66,10 +70,13 @@ ColumnPrinter RandomWalkCrossTraffic::StatsPrinter() { PulsedPeaksCrossTraffic::PulsedPeaksCrossTraffic(PulsedPeaksConfig config, TrafficRoute* traffic_route) - : config_(config), traffic_route_(traffic_route) {} + : config_(config), traffic_route_(traffic_route) { + sequence_checker_.Detach(); +} PulsedPeaksCrossTraffic::~PulsedPeaksCrossTraffic() = default; void PulsedPeaksCrossTraffic::Process(Timestamp at_time) { + RTC_DCHECK_RUN_ON(&sequence_checker_); TimeDelta time_since_toggle = at_time - last_update_time_; if (time_since_toggle.IsInfinite() || (sending_ && time_since_toggle >= config_.send_duration)) { @@ -94,6 +101,7 @@ void PulsedPeaksCrossTraffic::Process(Timestamp at_time) { } DataRate PulsedPeaksCrossTraffic::TrafficRate() const { + RTC_DCHECK_RUN_ON(&sequence_checker_); return sending_ ? config_.peak_rate : DataRate::Zero(); } diff --git a/test/scenario/network/cross_traffic.h b/test/scenario/network/cross_traffic.h index b5a65fb23e..e88827466c 100644 --- a/test/scenario/network/cross_traffic.h +++ b/test/scenario/network/cross_traffic.h @@ -18,6 +18,7 @@ #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "rtc_base/random.h" +#include "rtc_base/synchronization/sequence_checker.h" #include "test/scenario/column_printer.h" #include "test/scenario/network/traffic_route.h" @@ -44,15 +45,19 @@ class RandomWalkCrossTraffic { ColumnPrinter StatsPrinter(); private: - RandomWalkConfig config_; - TrafficRoute* const traffic_route_; - webrtc::Random random_; + SequenceChecker sequence_checker_; + const RandomWalkConfig config_; + TrafficRoute* const traffic_route_ RTC_PT_GUARDED_BY(sequence_checker_); + webrtc::Random random_ RTC_GUARDED_BY(sequence_checker_); - Timestamp last_process_time_ = Timestamp::MinusInfinity(); - Timestamp last_update_time_ = Timestamp::MinusInfinity(); - Timestamp last_send_time_ = Timestamp::MinusInfinity(); - double intensity_ = 0; - DataSize pending_size_ = DataSize::Zero(); + Timestamp last_process_time_ RTC_GUARDED_BY(sequence_checker_) = + Timestamp::MinusInfinity(); + Timestamp last_update_time_ RTC_GUARDED_BY(sequence_checker_) = + Timestamp::MinusInfinity(); + Timestamp last_send_time_ RTC_GUARDED_BY(sequence_checker_) = + Timestamp::MinusInfinity(); + double intensity_ RTC_GUARDED_BY(sequence_checker_) = 0; + DataSize pending_size_ RTC_GUARDED_BY(sequence_checker_) = DataSize::Zero(); }; struct PulsedPeaksConfig { @@ -74,12 +79,15 @@ class PulsedPeaksCrossTraffic { ColumnPrinter StatsPrinter(); private: - PulsedPeaksConfig config_; - TrafficRoute* const traffic_route_; + SequenceChecker sequence_checker_; + const PulsedPeaksConfig config_; + TrafficRoute* const traffic_route_ RTC_PT_GUARDED_BY(sequence_checker_); - Timestamp last_update_time_ = Timestamp::MinusInfinity(); - Timestamp last_send_time_ = Timestamp::MinusInfinity(); - bool sending_ = false; + Timestamp last_update_time_ RTC_GUARDED_BY(sequence_checker_) = + Timestamp::MinusInfinity(); + Timestamp last_send_time_ RTC_GUARDED_BY(sequence_checker_) = + Timestamp::MinusInfinity(); + bool sending_ RTC_GUARDED_BY(sequence_checker_) = false; }; } // namespace test diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc index ae67f6fb5e..b132e808bc 100644 --- a/test/scenario/network/network_emulation_manager.cc +++ b/test/scenario/network/network_emulation_manager.cc @@ -23,12 +23,33 @@ namespace webrtc { namespace test { namespace { -constexpr int64_t kPacketProcessingIntervalMs = 1; // uint32_t representation of 192.168.0.0 address constexpr uint32_t kMinIPv4Address = 0xC0A80000; // uint32_t representation of 192.168.255.255 address constexpr uint32_t kMaxIPv4Address = 0xC0A8FFFF; +template +class ResourceOwningTask final : public QueuedTask { + public: + ResourceOwningTask(T&& resource, Closure&& handler) + : resource_(std::move(resource)), + handler_(std::forward(handler)) {} + + bool Run() override { + handler_(std::move(resource_)); + return true; + } + + private: + T resource_; + Closure handler_; +}; +template +std::unique_ptr CreateResourceOwningTask(T resource, + Closure&& closure) { + return absl::make_unique>( + std::forward(resource), std::forward(closure)); +} } // namespace NetworkEmulationManagerImpl::NetworkEmulationManagerImpl() @@ -42,10 +63,6 @@ NetworkEmulationManagerImpl::NetworkEmulationManagerImpl( task_queue_(time_controller->GetTaskQueueFactory()->CreateTaskQueue( "NetworkEmulation", TaskQueueFactory::Priority::NORMAL)) { - process_task_handle_ = RepeatingTaskHandle::Start(task_queue_.Get(), [this] { - ProcessNetworkPackets(); - return TimeDelta::ms(kPacketProcessingIntervalMs); - }); } // TODO(srte): Ensure that any pending task that must be run for consistency @@ -170,17 +187,21 @@ RandomWalkCrossTraffic* NetworkEmulationManagerImpl::CreateRandomWalkCrossTraffic( TrafficRoute* traffic_route, RandomWalkConfig config) { - auto traffic = absl::make_unique(std::move(config), - traffic_route); + auto traffic = + absl::make_unique(config, traffic_route); RandomWalkCrossTraffic* out = traffic.get(); - struct Closure { - void operator()() { - manager->random_cross_traffics_.push_back(std::move(traffic)); - } - NetworkEmulationManagerImpl* manager; - std::unique_ptr traffic; - }; - task_queue_.PostTask(Closure{this, std::move(traffic)}); + + task_queue_.PostTask(CreateResourceOwningTask( + std::move(traffic), + [this, config](std::unique_ptr traffic) { + auto* traffic_ptr = traffic.get(); + random_cross_traffics_.push_back(std::move(traffic)); + RepeatingTaskHandle::Start(task_queue_.Get(), + [this, config, traffic_ptr] { + traffic_ptr->Process(Now()); + return config.min_packet_interval; + }); + })); return out; } @@ -188,17 +209,20 @@ PulsedPeaksCrossTraffic* NetworkEmulationManagerImpl::CreatePulsedPeaksCrossTraffic( TrafficRoute* traffic_route, PulsedPeaksConfig config) { - auto traffic = absl::make_unique(std::move(config), - traffic_route); + auto traffic = + absl::make_unique(config, traffic_route); PulsedPeaksCrossTraffic* out = traffic.get(); - struct Closure { - void operator()() { - manager->pulsed_cross_traffics_.push_back(std::move(traffic)); - } - NetworkEmulationManagerImpl* manager; - std::unique_ptr traffic; - }; - task_queue_.PostTask(Closure{this, std::move(traffic)}); + task_queue_.PostTask(CreateResourceOwningTask( + std::move(traffic), + [this, config](std::unique_ptr traffic) { + auto* traffic_ptr = traffic.get(); + pulsed_cross_traffics_.push_back(std::move(traffic)); + RepeatingTaskHandle::Start(task_queue_.Get(), + [this, config, traffic_ptr] { + traffic_ptr->Process(Now()); + return config.min_packet_interval; + }); + })); return out; } @@ -242,16 +266,6 @@ NetworkEmulationManagerImpl::GetNextIPv4Address() { return absl::nullopt; } -void NetworkEmulationManagerImpl::ProcessNetworkPackets() { - Timestamp current_time = Now(); - for (auto& traffic : random_cross_traffics_) { - traffic->Process(current_time); - } - for (auto& traffic : pulsed_cross_traffics_) { - traffic->Process(current_time); - } -} - Timestamp NetworkEmulationManagerImpl::Now() const { return Timestamp::us(clock_->TimeInMicroseconds()); } diff --git a/test/scenario/network/network_emulation_manager.h b/test/scenario/network/network_emulation_manager.h index b3fd2a5c3e..72f15fdbf4 100644 --- a/test/scenario/network/network_emulation_manager.h +++ b/test/scenario/network/network_emulation_manager.h @@ -69,7 +69,6 @@ class NetworkEmulationManagerImpl : public NetworkEmulationManager { FakeNetworkSocketServer* CreateSocketServer( const std::vector& endpoints); absl::optional GetNextIPv4Address(); - void ProcessNetworkPackets(); Timestamp Now() const; Clock* const clock_;