Allow RTP module thread checking to know PacketRouter status.

Since https://webrtc-review.googlesource.com/c/src/+/228433 both audio
and video now only call Get/SetRtpState while not registered to the
packet router.

We can thus remove the lock around packet sequencer and just use a
thread checker.

Bug: webrtc:11340
Change-Id: Ie6865cc96c36208700c31a75747ff4dd992ce68d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228435
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34755}
This commit is contained in:
Erik Språng
2021-08-13 16:12:41 +02:00
committed by WebRTC LUCI CQ
parent 05a9e5abd3
commit b6bbdeb24d
8 changed files with 70 additions and 49 deletions

View File

@ -64,22 +64,22 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
const RtpRtcpInterface::Configuration& config)
: packet_history(config.clock, config.enable_rtx_padding_prioritization),
deferred_sequencing_(config.use_deferred_sequencing),
sequencer_(config.local_media_ssrc,
config.rtx_send_ssrc,
/*require_marker_before_media_padding=*/!config.audio,
config.clock),
sequencer(config.local_media_ssrc,
config.rtx_send_ssrc,
/*require_marker_before_media_padding=*/!config.audio,
config.clock),
packet_sender(config, &packet_history),
non_paced_sender(&packet_sender, this, config.use_deferred_sequencing),
packet_generator(
config,
&packet_history,
config.paced_sender ? config.paced_sender : &non_paced_sender,
config.use_deferred_sequencing ? nullptr : &sequencer_) {}
config.use_deferred_sequencing ? nullptr : &sequencer) {}
void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber(
RtpPacketToSend* packet) {
RTC_DCHECK_RUN_ON(&sequencing_checker);
if (deferred_sequencing_) {
MutexLock lock(&mutex_sequencer_);
sequencer_.Sequence(*packet);
sequencer.Sequence(*packet);
} else {
packet_generator.AssignSequenceNumber(packet);
}
@ -101,10 +101,10 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
rtt_stats_(configuration.rtt_stats),
rtt_ms_(0) {
RTC_DCHECK(worker_queue_);
packet_sequence_checker_.Detach();
pacer_thread_checker_.Detach();
rtcp_thread_checker_.Detach();
if (!configuration.receiver_only) {
rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
rtp_sender_->sequencing_checker.Detach();
// Make sure rtcp sender use same timestamp offset as rtp sender.
rtcp_sender_.SetTimestampOffset(
rtp_sender_->packet_generator.TimestampOffset());
@ -162,7 +162,7 @@ absl::optional<uint32_t> ModuleRtpRtcpImpl2::FlexfecSsrc() const {
void ModuleRtpRtcpImpl2::IncomingRtcpPacket(const uint8_t* rtcp_packet,
const size_t length) {
RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
rtcp_receiver_.IncomingPacket(rtcp_packet, length);
}
@ -187,20 +187,19 @@ void ModuleRtpRtcpImpl2::SetStartTimestamp(const uint32_t timestamp) {
}
uint16_t ModuleRtpRtcpImpl2::SequenceNumber() const {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
MutexLock lock(&rtp_sender_->mutex_sequencer_);
return rtp_sender_->sequencer_.media_sequence_number();
return rtp_sender_->sequencer.media_sequence_number();
}
return rtp_sender_->packet_generator.SequenceNumber();
}
// Set SequenceNumber, default is a random number.
void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
MutexLock lock(&rtp_sender_->mutex_sequencer_);
if (rtp_sender_->sequencer_.media_sequence_number() != seq_num) {
rtp_sender_->sequencer_.set_media_sequence_number(seq_num);
if (rtp_sender_->sequencer.media_sequence_number() != seq_num) {
rtp_sender_->sequencer.set_media_sequence_number(seq_num);
rtp_sender_->packet_history.Clear();
}
} else {
@ -209,36 +208,36 @@ void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) {
}
void ModuleRtpRtcpImpl2::SetRtpState(const RtpState& rtp_state) {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
rtp_sender_->packet_generator.SetRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
MutexLock lock(&rtp_sender_->mutex_sequencer_);
rtp_sender_->sequencer_.SetRtpState(rtp_state);
rtp_sender_->sequencer.SetRtpState(rtp_state);
}
rtcp_sender_.SetTimestampOffset(rtp_state.start_timestamp);
}
void ModuleRtpRtcpImpl2::SetRtxState(const RtpState& rtp_state) {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
rtp_sender_->packet_generator.SetRtxRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
MutexLock lock(&rtp_sender_->mutex_sequencer_);
rtp_sender_->sequencer_.set_rtx_sequence_number(rtp_state.sequence_number);
rtp_sender_->sequencer.set_rtx_sequence_number(rtp_state.sequence_number);
}
}
RtpState ModuleRtpRtcpImpl2::GetRtpState() const {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
RtpState state = rtp_sender_->packet_generator.GetRtpState();
if (rtp_sender_->deferred_sequencing_) {
MutexLock lock(&rtp_sender_->mutex_sequencer_);
rtp_sender_->sequencer_.PopulateRtpState(state);
rtp_sender_->sequencer.PopulateRtpState(state);
}
return state;
}
RtpState ModuleRtpRtcpImpl2::GetRtxState() const {
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
RtpState state = rtp_sender_->packet_generator.GetRtxRtpState();
if (rtp_sender_->deferred_sequencing_) {
MutexLock lock(&rtp_sender_->mutex_sequencer_);
state.sequence_number = rtp_sender_->sequencer_.rtx_sequence_number();
state.sequence_number = rtp_sender_->sequencer.rtx_sequence_number();
}
return state;
}
@ -249,7 +248,7 @@ void ModuleRtpRtcpImpl2::SetNonSenderRttMeasurement(bool enabled) {
}
uint32_t ModuleRtpRtcpImpl2::local_media_ssrc() const {
RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
RTC_DCHECK_EQ(rtcp_receiver_.local_media_ssrc(), rtcp_sender_.SSRC());
return rtcp_receiver_.local_media_ssrc();
}
@ -335,12 +334,6 @@ bool ModuleRtpRtcpImpl2::Sending() const {
// updated.
void ModuleRtpRtcpImpl2::SetSendingMediaStatus(const bool sending) {
if (rtp_sender_) {
// Turning on or off sending status indicates module being set
// up or torn down, detach thread checker since subsequent calls
// may be from a different thread.
if (rtp_sender_->packet_generator.SendingMedia() != sending) {
pacer_thread_checker_.Detach();
}
rtp_sender_->packet_generator.SetSendingMediaStatus(sending);
} else {
RTC_DCHECK(!sending);
@ -389,15 +382,14 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp,
bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(rtp_sender_);
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
if (rtp_sender_->deferred_sequencing_) {
if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
}
MutexLock lock(&rtp_sender_->mutex_sequencer_);
if (packet->packet_type() == RtpPacketMediaType::kPadding &&
packet->Ssrc() == rtp_sender_->packet_generator.SSRC() &&
!rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()) {
!rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()) {
// New media packet preempted this generated padding packet, discard it.
return false;
}
@ -405,7 +397,7 @@ bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
packet->packet_type() == RtpPacketMediaType::kForwardErrorCorrection &&
packet->Ssrc() == rtp_sender_->packet_generator.FlexfecSsrc();
if (!is_flexfec) {
rtp_sender_->sequencer_.Sequence(*packet);
rtp_sender_->sequencer.Sequence(*packet);
}
} else if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
@ -425,7 +417,7 @@ void ModuleRtpRtcpImpl2::SetFecProtectionParams(
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::FetchFecPackets() {
RTC_DCHECK(rtp_sender_);
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
auto fec_packets = rtp_sender_->packet_sender.FetchFecPackets();
if (!fec_packets.empty() && !rtp_sender_->deferred_sequencing_) {
// Only assign sequence numbers for FEC packets in non-deferred mode, and
@ -460,17 +452,15 @@ bool ModuleRtpRtcpImpl2::SupportsRtxPayloadPadding() const {
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::GeneratePadding(size_t target_size_bytes) {
RTC_DCHECK(rtp_sender_);
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
MutexLock lock(&rtp_sender_->mutex_sequencer_);
RTC_DCHECK_RUN_ON(&rtp_sender_->sequencing_checker);
// `can_send_padding_on_media_ssrc` set to false when deferred sequencing
// is off. It will be ignored in that case, RTPSender will internally query
// `sequencer_` while holding the send lock instead.
// `sequencer` while holding the send lock instead.
return rtp_sender_->packet_generator.GeneratePadding(
target_size_bytes, rtp_sender_->packet_sender.MediaHasBeenSent(),
rtp_sender_->deferred_sequencing_
? rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()
? rtp_sender_->sequencer.CanSendPaddingOnMediaSsrc()
: false);
}
@ -488,6 +478,11 @@ size_t ModuleRtpRtcpImpl2::ExpectedPerPacketOverhead() const {
return rtp_sender_->packet_generator.ExpectedPerPacketOverhead();
}
void ModuleRtpRtcpImpl2::OnPacketSendingThreadSwitched() {
// Ownership of sequencing is being transferred to another thread.
rtp_sender_->sequencing_checker.Detach();
}
size_t ModuleRtpRtcpImpl2::MaxRtpPacketSize() const {
RTC_DCHECK(rtp_sender_);
return rtp_sender_->packet_generator.MaxRtpPacketSize();
@ -730,7 +725,7 @@ void ModuleRtpRtcpImpl2::SetRemoteSSRC(const uint32_t ssrc) {
}
void ModuleRtpRtcpImpl2::SetLocalSsrc(uint32_t local_ssrc) {
RTC_DCHECK_RUN_ON(&packet_sequence_checker_);
RTC_DCHECK_RUN_ON(&rtcp_thread_checker_);
rtcp_receiver_.set_local_media_ssrc(local_ssrc);
rtcp_sender_.SetSsrc(local_ssrc);
}