Makes critsect_.Leave() more visible in PacedSender.

This means that the PacedSender::Process function becomes slightly
larger, however, it makes it much more obvious to the reader where
the locks are held or not. Confusion over this has previously caused
bugs.

Bug: webrtc:9870
Change-Id: I63257eae59ecf5e7dd28ea24f63157cefe9f81bd
Reviewed-on: https://webrtc-review.googlesource.com/c/105460
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25389}
This commit is contained in:
Sebastian Jansson
2018-10-26 13:10:23 +02:00
committed by Commit Bot
parent 6dd7f9120e
commit 916ae08a04
2 changed files with 79 additions and 56 deletions

View File

@ -11,6 +11,7 @@
#include "modules/pacing/paced_sender.h" #include "modules/pacing/paced_sender.h"
#include <algorithm> #include <algorithm>
#include <utility>
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "logging/rtc_event_log/rtc_event_log.h" #include "logging/rtc_event_log/rtc_event_log.h"
@ -257,9 +258,7 @@ int64_t PacedSender::TimeUntilNextProcess() {
return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0); return std::max<int64_t>(kMinPacketLimitMs - elapsed_time_ms, 0);
} }
void PacedSender::Process() { int64_t PacedSender::UpdateTimeAndGetElapsedMs(int64_t now_us) {
int64_t now_us = clock_->TimeInMicroseconds();
rtc::CritScope cs(&critsect_);
int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000; int64_t elapsed_time_ms = (now_us - time_last_process_us_ + 500) / 1000;
time_last_process_us_ = now_us; time_last_process_us_ = now_us;
if (elapsed_time_ms > kMaxElapsedTimeMs) { if (elapsed_time_ms > kMaxElapsedTimeMs) {
@ -268,6 +267,10 @@ void PacedSender::Process() {
<< kMaxElapsedTimeMs << " ms"; << kMaxElapsedTimeMs << " ms";
elapsed_time_ms = kMaxElapsedTimeMs; elapsed_time_ms = kMaxElapsedTimeMs;
} }
return elapsed_time_ms;
}
bool PacedSender::ShouldSendKeepalive(int64_t now_us) const {
if (send_padding_if_silent_ || paused_ || Congested()) { if (send_padding_if_silent_ || paused_ || Congested()) {
// We send a padding packet every 500 ms to ensure we won't get stuck in // We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received. // congested state due to no feedback being received.
@ -276,12 +279,25 @@ void PacedSender::Process() {
// We can not send padding unless a normal packet has first been sent. If // We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up. // we do, timestamps get messed up.
if (packet_counter_ > 0) { if (packet_counter_ > 0) {
PacedPacketInfo pacing_info; return true;
size_t bytes_sent = SendPadding(1, pacing_info);
alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
} }
} }
} }
return false;
}
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
int64_t now_us = clock_->TimeInMicroseconds();
int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us);
if (ShouldSendKeepalive(now_us)) {
critsect_.Leave();
size_t bytes_sent = packet_sender_->TimeToSendPadding(1, PacedPacketInfo());
critsect_.Enter();
OnPaddingSent(bytes_sent);
alr_detector_->OnBytesSent(bytes_sent, now_us / 1000);
}
if (paused_) if (paused_)
return; return;
@ -315,23 +331,27 @@ void PacedSender::Process() {
pacing_info = prober_.CurrentCluster(); pacing_info = prober_.CurrentCluster();
recommended_probe_size = prober_.RecommendedMinProbeSize(); recommended_probe_size = prober_.RecommendedMinProbeSize();
} }
// The paused state is checked in the loop since SendPacket leaves the // The paused state is checked in the loop since it leaves the critical
// critical section allowing the paused state to be changed from other code. // section allowing the paused state to be changed from other code.
while (!packets_.Empty() && !paused_) { while (!packets_.Empty() && !paused_) {
// Since we need to release the lock in order to send, we first pop the const auto* packet = GetPendingPacket(pacing_info);
// element from the priority queue but keep it in storage, so that we can if (packet == nullptr)
// reinsert it if send fails. break;
const RoundRobinPacketQueue::Packet& packet = packets_.BeginPop();
if (SendPacket(packet, pacing_info)) { critsect_.Leave();
bytes_sent += packet.bytes; bool success = packet_sender_->TimeToSendPacket(
packet->ssrc, packet->sequence_number, packet->capture_time_ms,
packet->retransmission, pacing_info);
critsect_.Enter();
if (success) {
bytes_sent += packet->bytes;
// Send succeeded, remove it from the queue. // Send succeeded, remove it from the queue.
packets_.FinalizePop(packet); OnPacketSent(std::move(packet));
if (is_probing && bytes_sent > recommended_probe_size) if (is_probing && bytes_sent > recommended_probe_size)
break; break;
} else { } else {
// Send failed, put it back into the queue. // Send failed, put it back into the queue.
packets_.CancelPop(packet); packets_.CancelPop(*packet);
break; break;
} }
} }
@ -344,7 +364,12 @@ void PacedSender::Process() {
static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent) static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
: padding_budget_.bytes_remaining()); : padding_budget_.bytes_remaining());
if (padding_needed > 0) { if (padding_needed > 0) {
bytes_sent += SendPadding(padding_needed, pacing_info); critsect_.Leave();
size_t padding_sent =
packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
critsect_.Enter();
bytes_sent += padding_sent;
OnPaddingSent(padding_sent);
} }
} }
} }
@ -362,54 +387,46 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread; process_thread_ = process_thread;
} }
bool PacedSender::SendPacket(const RoundRobinPacketQueue::Packet& packet, const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket(
const PacedPacketInfo& pacing_info) { const PacedPacketInfo& pacing_info) {
RTC_DCHECK(!paused_); // Since we need to release the lock in order to send, we first pop the
bool audio_packet = packet.priority == kHighPriority; // element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop();
bool audio_packet = packet->priority == kHighPriority;
bool apply_pacing = bool apply_pacing =
!audio_packet || account_for_audio_ || video_blocks_audio_; !audio_packet || account_for_audio_ || video_blocks_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id == pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) { PacedPacketInfo::kNotAProbe))) {
return false; packets_.CancelPop(*packet);
return nullptr;
} }
return packet;
critsect_.Leave();
const bool success = packet_sender_->TimeToSendPacket(
packet.ssrc, packet.sequence_number, packet.capture_time_ms,
packet.retransmission, pacing_info);
critsect_.Enter();
if (success) {
if (first_sent_packet_ms_ == -1)
first_sent_packet_ms_ = TimeMilliseconds();
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
// TODO(eladalon): TimeToSendPacket() can also return |true| in some
// situations where nothing actually ended up being sent to the network,
// and we probably don't want to update the budget in such cases.
// https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
UpdateBudgetWithBytesSent(packet.bytes);
last_send_time_us_ = clock_->TimeInMicroseconds();
}
}
return success;
} }
size_t PacedSender::SendPadding(size_t padding_needed, void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) {
const PacedPacketInfo& pacing_info) { if (first_sent_packet_ms_ == -1)
RTC_DCHECK_GT(packet_counter_, 0); first_sent_packet_ms_ = TimeMilliseconds();
critsect_.Leave(); bool audio_packet = packet->priority == kHighPriority;
size_t bytes_sent = if (!audio_packet || account_for_audio_) {
packet_sender_->TimeToSendPadding(padding_needed, pacing_info); // Update media bytes sent.
critsect_.Enter(); // TODO(eladalon): TimeToSendPacket() can also return |true| in some
// situations where nothing actually ended up being sent to the network,
// and we probably don't want to update the budget in such cases.
// https://bugs.chromium.org/p/webrtc/issues/detail?id=8052
UpdateBudgetWithBytesSent(packet->bytes);
last_send_time_us_ = clock_->TimeInMicroseconds();
}
// Send succeeded, remove it from the queue.
packets_.FinalizePop(*packet);
}
void PacedSender::OnPaddingSent(size_t bytes_sent) {
if (bytes_sent > 0) { if (bytes_sent > 0) {
UpdateBudgetWithBytesSent(bytes_sent); UpdateBudgetWithBytesSent(bytes_sent);
} }
last_send_time_us_ = clock_->TimeInMicroseconds(); last_send_time_us_ = clock_->TimeInMicroseconds();
return bytes_sent;
} }
void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) { void PacedSender::UpdateBudgetWithElapsedTime(int64_t delta_time_ms) {

View File

@ -143,19 +143,25 @@ class PacedSender : public Pacer {
void SetQueueTimeLimit(int limit_ms); void SetQueueTimeLimit(int limit_ms);
private: private:
int64_t UpdateTimeAndGetElapsedMs(int64_t now_us)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool ShouldSendKeepalive(int64_t at_time_us) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Updates the number of bytes that can be sent for the next time interval. // Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms) void UpdateBudgetWithElapsedTime(int64_t delta_time_in_ms)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void UpdateBudgetWithBytesSent(size_t bytes) void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacket(const RoundRobinPacketQueue::Packet& packet, const RoundRobinPacketQueue::Packet* GetPendingPacket(
const PacedPacketInfo& cluster_info) const PacedPacketInfo& pacing_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info) void OnPacketSent(const RoundRobinPacketQueue::Packet* packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPaddingSent(size_t padding_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnBytesSent(size_t bytes_sent) RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); int64_t TimeMilliseconds() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);