Enable thread check in Call::GetStats().

I've updated all the tests that previously were calling this method on
the wrong thread, so we can enable this check now.

I've also landed some changes that simplify the threading model in this
class and subsequently I've removed some locks and can remove some more
in this CL.

Added some comments about future improvements for GetStats() to reduce
synchronization.

Simplified CallStats::OnRttUpdate() to have one fewer async methods.

Bug: webrtc:10847
Change-Id: I48e6809172142cc4be4385b7d4aa2affb52a963a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/148588
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28821}
This commit is contained in:
Tommi
2019-08-09 11:42:32 +02:00
committed by Commit Bot
parent e4ba4ee837
commit 48b48e5cc9
2 changed files with 59 additions and 48 deletions

View File

@ -255,6 +255,10 @@ class Call final : public webrtc::Call,
void RegisterRateObserver();
rtc::TaskQueue* network_queue() const {
return transport_send_ptr_->GetWorkerQueue();
}
Clock* const clock_;
TaskQueueFactory* const task_queue_factory_;
@ -268,11 +272,7 @@ class Call final : public webrtc::Call,
NetworkState audio_network_state_;
NetworkState video_network_state_;
// TODO(tommi): Once tests have been fixed to not call GetStats() on the wrong
// thread, remove this lock and protect aggregate_network_up_crit_ with the
// configuration_sequence_checker_.
rtc::CriticalSection aggregate_network_up_crit_;
bool aggregate_network_up_ RTC_GUARDED_BY(aggregate_network_up_crit_);
bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_);
std::unique_ptr<RWLockWrapper> receive_crit_;
// Audio, Video, and FlexFEC receive streams are owned by the client that
@ -466,6 +466,13 @@ Call::Call(Clock* clock,
transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr);
worker_sequence_checker_.Detach();
call_stats_->RegisterStatsObserver(&receive_side_cc_);
module_process_thread_->RegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE);
module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
}
Call::~Call() {
@ -477,30 +484,27 @@ Call::~Call() {
RTC_CHECK(audio_receive_streams_.empty());
RTC_CHECK(video_receive_streams_.empty());
module_process_thread_->Stop();
module_process_thread_->DeRegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true));
module_process_thread_->DeRegisterModule(&receive_side_cc_);
module_process_thread_->DeRegisterModule(call_stats_.get());
module_process_thread_->Stop();
call_stats_->DeregisterStatsObserver(&receive_side_cc_);
absl::optional<Timestamp> first_sent_packet_ms =
transport_send_->GetFirstPacketTime();
// Only update histograms after process threads have been shut down, so that
// they won't try to concurrently update stats.
if (first_sent_packet_ms) {
rtc::CritScope lock(&bitrate_crit_);
UpdateSendHistograms(*first_sent_packet_ms);
}
UpdateReceiveHistograms();
UpdateHistograms();
}
// TODO(tommi): Most of this work could be done when Call gets created.
// Starting the process thread itself could be done on demand when streams
// are created and in that case, calling Start() multiple times is harmless
// so holding an extra state variable, |is_target_rate_observer_registered_|
// also shouldn't be necessary.
void Call::RegisterRateObserver() {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
@ -509,14 +513,10 @@ void Call::RegisterRateObserver() {
is_target_rate_observer_registered_ = true;
// This call seems to kick off a number of things, so probably better left
// off being kicked off on request rather than in the ctor.
transport_send_ptr_->RegisterTargetTransferRateObserver(this);
call_stats_->RegisterStatsObserver(&receive_side_cc_);
module_process_thread_->RegisterModule(
receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
module_process_thread_->RegisterModule(call_stats_.get(), RTC_FROM_HERE);
module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);
module_process_thread_->Start();
}
@ -531,6 +531,7 @@ void Call::UpdateHistograms() {
(clock_->TimeInMilliseconds() - start_ms_) / 1000);
}
// Called from the dtor.
void Call::UpdateSendHistograms(Timestamp first_sent_packet) {
int64_t elapsed_sec =
(clock_->TimeInMilliseconds() - first_sent_packet.ms()) / 1000;
@ -950,35 +951,41 @@ RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
}
Call::Stats Call::GetStats() const {
// TODO(solenberg): Some test cases in EndToEndTest use this from a different
// thread. Re-enable once that is fixed.
// RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_);
// TODO(tommi): The following stats are managed on the process thread:
// - pacer_delay_ms (PacedSender::Process)
// - rtt_ms
// - recv_bandwidth_bps
// These are delivered on the network TQ:
// - send_bandwidth_bps (see OnTargetTransferRate)
// - max_padding_bitrate_bps (see OnAllocationLimitsChanged)
Stats stats;
// TODO(srte): It is unclear if we only want to report queues if network is
// available.
stats.pacer_delay_ms =
aggregate_network_up_ ? transport_send_ptr_->GetPacerQueuingDelayMs() : 0;
stats.rtt_ms = call_stats_->LastProcessedRtt();
// Fetch available send/receive bitrates.
std::vector<unsigned int> ssrcs;
uint32_t recv_bandwidth = 0;
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth);
stats.recv_bandwidth_bps = recv_bandwidth;
{
rtc::CritScope cs(&last_bandwidth_bps_crit_);
stats.send_bandwidth_bps = last_bandwidth_bps_;
}
stats.recv_bandwidth_bps = recv_bandwidth;
// TODO(srte): It is unclear if we only want to report queues if network is
// available.
{
rtc::CritScope cs(&aggregate_network_up_crit_);
stats.pacer_delay_ms = aggregate_network_up_
? transport_send_ptr_->GetPacerQueuingDelayMs()
: 0;
}
stats.rtt_ms = call_stats_->LastProcessedRtt();
{
rtc::CritScope cs(&bitrate_crit_);
stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
}
return stats;
}
@ -1048,10 +1055,8 @@ void Call::UpdateAggregateNetworkState() {
RTC_LOG(LS_INFO) << "UpdateAggregateNetworkState: aggregate_state="
<< (aggregate_network_up ? "up" : "down");
{
rtc::CritScope cs(&aggregate_network_up_crit_);
aggregate_network_up_ = aggregate_network_up;
}
aggregate_network_up_ = aggregate_network_up;
transport_send_ptr_->OnNetworkAvailability(aggregate_network_up);
}
@ -1062,16 +1067,12 @@ void Call::OnSentPacket(const rtc::SentPacket& sent_packet) {
}
void Call::OnStartRateUpdate(DataRate start_rate) {
if (!transport_send_ptr_->GetWorkerQueue()->IsCurrent()) {
transport_send_ptr_->GetWorkerQueue()->PostTask(
[this, start_rate] { this->OnStartRateUpdate(start_rate); });
return;
}
RTC_DCHECK(network_queue()->IsCurrent());
bitrate_allocator_->UpdateStartRate(start_rate.bps<uint32_t>());
}
void Call::OnTargetTransferRate(TargetTransferRate msg) {
RTC_DCHECK(transport_send_ptr_->GetWorkerQueue()->IsCurrent());
RTC_DCHECK(network_queue()->IsCurrent());
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
uint32_t target_bitrate_bps = msg.target_rate.bps();
@ -1122,7 +1123,9 @@ void Call::OnTargetTransferRate(TargetTransferRate msg) {
void Call::OnAllocationLimitsChanged(uint32_t min_send_bitrate_bps,
uint32_t max_padding_bitrate_bps,
uint32_t total_bitrate_bps) {
RTC_DCHECK(network_queue()->IsCurrent());
RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
transport_send_ptr_->SetAllocatedSendBitrateLimits(
min_send_bitrate_bps, max_padding_bitrate_bps, total_bitrate_bps);