Add support for deferred sequence numbering.

With this turned on, packets will be sequence number after the pacing
stage rather that during packetization.
This avoids a race where packets may be sent out of order, and paves
the way for the ability to cull packets from the pacer queue without
causing sequence number gaps.

For now, the feature is off by default. Follow-ups will enable it for
video and audio separately.

Bug: webrtc:11340, webrtc:12470
Change-Id: I6d411d8c85b9047e3e9b05ff4c2c3ed97c579aa1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/208584
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34661}
This commit is contained in:
Erik Språng
2021-08-06 13:10:11 +02:00
committed by WebRTC LUCI CQ
parent 82c2248511
commit bb90497eaa
11 changed files with 164 additions and 41 deletions

View File

@ -63,20 +63,25 @@ int DelayMillisForDuration(TimeDelta duration) {
ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
const RtpRtcpInterface::Configuration& config)
: packet_history(config.clock, config.enable_rtx_padding_prioritization),
deferred_sequencing_(config.use_deferred_sequencing),
sequencer_(config.local_media_ssrc,
config.rtx_send_ssrc,
/*require_marker_before_media_padding=*/!config.audio,
config.clock),
packet_sender(config, &packet_history),
non_paced_sender(&packet_sender, this),
non_paced_sender(&packet_sender, this, config.use_deferred_sequencing),
packet_generator(
config,
&packet_history,
config.paced_sender ? config.paced_sender : &non_paced_sender,
&sequencer_) {}
config.use_deferred_sequencing ? nullptr : &sequencer_) {}
void ModuleRtpRtcpImpl2::RtpSenderContext::AssignSequenceNumber(
RtpPacketToSend* packet) {
packet_generator.AssignSequenceNumber(packet);
if (deferred_sequencing_) {
sequencer_.Sequence(*packet);
} else {
packet_generator.AssignSequenceNumber(packet);
}
}
ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
@ -96,6 +101,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
rtt_ms_(0) {
RTC_DCHECK(worker_queue_);
packet_sequence_checker_.Detach();
pacer_thread_checker_.Detach();
if (!configuration.receiver_only) {
rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
// Make sure rtcp sender use same timestamp offset as rtp sender.
@ -180,30 +186,54 @@ void ModuleRtpRtcpImpl2::SetStartTimestamp(const uint32_t timestamp) {
}
uint16_t ModuleRtpRtcpImpl2::SequenceNumber() const {
if (rtp_sender_->deferred_sequencing_) {
return rtp_sender_->sequencer_.media_sequence_number();
}
return rtp_sender_->packet_generator.SequenceNumber();
}
// Set SequenceNumber, default is a random number.
void ModuleRtpRtcpImpl2::SetSequenceNumber(const uint16_t seq_num) {
rtp_sender_->packet_generator.SetSequenceNumber(seq_num);
if (rtp_sender_->deferred_sequencing_) {
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
if (rtp_sender_->sequencer_.media_sequence_number() != seq_num) {
rtp_sender_->sequencer_.set_media_sequence_number(seq_num);
rtp_sender_->packet_history.Clear();
}
} else {
rtp_sender_->packet_generator.SetSequenceNumber(seq_num);
}
}
void ModuleRtpRtcpImpl2::SetRtpState(const RtpState& rtp_state) {
rtp_sender_->packet_generator.SetRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
rtp_sender_->sequencer_.SetRtpState(rtp_state);
}
rtcp_sender_.SetTimestampOffset(rtp_state.start_timestamp);
}
void ModuleRtpRtcpImpl2::SetRtxState(const RtpState& rtp_state) {
rtp_sender_->packet_generator.SetRtxRtpState(rtp_state);
if (rtp_sender_->deferred_sequencing_) {
rtp_sender_->sequencer_.set_rtx_sequence_number(rtp_state.sequence_number);
}
}
RtpState ModuleRtpRtcpImpl2::GetRtpState() const {
RtpState state = rtp_sender_->packet_generator.GetRtpState();
if (rtp_sender_->deferred_sequencing_) {
rtp_sender_->sequencer_.PopulateRtpState(state);
}
return state;
}
RtpState ModuleRtpRtcpImpl2::GetRtxState() const {
return rtp_sender_->packet_generator.GetRtxRtpState();
RtpState state = rtp_sender_->packet_generator.GetRtxRtpState();
if (rtp_sender_->deferred_sequencing_) {
state.sequence_number = rtp_sender_->sequencer_.rtx_sequence_number();
}
return state;
}
void ModuleRtpRtcpImpl2::SetNonSenderRttMeasurement(bool enabled) {
@ -298,6 +328,12 @@ bool ModuleRtpRtcpImpl2::Sending() const {
// updated.
void ModuleRtpRtcpImpl2::SetSendingMediaStatus(const bool sending) {
if (rtp_sender_) {
// Turning on or off sending status indicates module being set
// up or torn down, detach thread checker since subsequent calls
// may be from a different thread.
if (rtp_sender_->packet_generator.SendingMedia() != sending) {
pacer_thread_checker_.Detach();
}
rtp_sender_->packet_generator.SetSendingMediaStatus(sending);
} else {
RTC_DCHECK(!sending);
@ -346,8 +382,22 @@ bool ModuleRtpRtcpImpl2::OnSendingRtpFrame(uint32_t timestamp,
bool ModuleRtpRtcpImpl2::TrySendPacket(RtpPacketToSend* packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(rtp_sender_);
// TODO(sprang): Consider if we can remove this check.
if (!rtp_sender_->packet_generator.SendingMedia()) {
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
if (rtp_sender_->deferred_sequencing_) {
RTC_DCHECK(rtp_sender_->packet_generator.SendingMedia());
if (packet->packet_type() == RtpPacketMediaType::kPadding &&
packet->Ssrc() == rtp_sender_->packet_generator.SSRC() &&
!rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()) {
// New media packet preempted this generated padding packet, discard it.
return false;
}
bool is_flexfec =
packet->packet_type() == RtpPacketMediaType::kForwardErrorCorrection &&
packet->Ssrc() == rtp_sender_->packet_generator.FlexfecSsrc();
if (!is_flexfec) {
rtp_sender_->sequencer_.Sequence(*packet);
}
} else if (!rtp_sender_->packet_generator.SendingMedia()) {
return false;
}
rtp_sender_->packet_sender.SendPacket(packet, pacing_info);
@ -365,9 +415,11 @@ void ModuleRtpRtcpImpl2::SetFecProtectionParams(
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::FetchFecPackets() {
RTC_DCHECK(rtp_sender_);
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
auto fec_packets = rtp_sender_->packet_sender.FetchFecPackets();
if (!fec_packets.empty()) {
// Don't assign sequence numbers for FlexFEC packets.
if (!fec_packets.empty() && !rtp_sender_->deferred_sequencing_) {
// Only assign sequence numbers for FEC packets in non-deferred mode, and
// never for FlexFEC which has as separate sequence number series.
const bool generate_sequence_numbers =
!rtp_sender_->packet_sender.FlexFecSsrc().has_value();
if (generate_sequence_numbers) {
@ -398,12 +450,21 @@ bool ModuleRtpRtcpImpl2::SupportsRtxPayloadPadding() const {
std::vector<std::unique_ptr<RtpPacketToSend>>
ModuleRtpRtcpImpl2::GeneratePadding(size_t target_size_bytes) {
RTC_DCHECK(rtp_sender_);
// `can_send_padding_on_media_ssrc` set to false but is ignored at this
// point, RTPSender will internally query `sequencer_` while holding the
// send lock.
RTC_DCHECK_RUN_ON(&pacer_thread_checker_);
// `can_send_padding_on_media_ssrc` set to false when deferred sequencing
// is off. It will be ignored in that case, RTPSender will internally query
// `sequencer_` while holding the send lock instead.
return rtp_sender_->packet_generator.GeneratePadding(
target_size_bytes, rtp_sender_->packet_sender.MediaHasBeenSent(),
/*can_send_padding_on_media_ssrc=*/false);
rtp_sender_->deferred_sequencing_
? rtp_sender_->sequencer_.CanSendPaddingOnMediaSsrc()
: false);
}
std::vector<RtpSequenceNumberMap::Info>