Files
platform-external-webrtc/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
Danil Chapovalov d5cae4d59c Add hacky way to send TransportFeedback in RtcpTransceiver
With an extra interface it will allow to add both RtpRtcp module
and RtcpTransceiver as feedback sender to PacketRouter

Though hacky, this is very similar to currently used implementation
in the RTCPSender::SendFeedbackPacket

Bug: webrtc:8239
Change-Id: I237b422ae1594dede78cb63daa4aa42b6774d6fe
Reviewed-on: https://webrtc-review.googlesource.com/32680
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#21274}
2017-12-14 11:12:43 +00:00

419 lines
15 KiB
C++

/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
#include <utility>
#include "api/call/transport.h"
#include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtcp_packet.h"
#include "modules/rtp_rtcp/source/rtcp_packet/bye.h"
#include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
#include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
#include "modules/rtp_rtcp/source/rtcp_packet/fir.h"
#include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
#include "modules/rtp_rtcp/source/rtcp_packet/pli.h"
#include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
#include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
#include "modules/rtp_rtcp/source/time_util.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/ptr_util.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/timeutils.h"
namespace webrtc {
namespace {
struct SenderReportTimes {
int64_t local_received_time_us;
NtpTime remote_sent_time;
};
} // namespace
struct RtcpTransceiverImpl::RemoteSenderState {
uint8_t fir_sequence_number = 0;
rtc::Optional<SenderReportTimes> last_received_sender_report;
std::vector<MediaReceiverRtcpObserver*> observers;
};
// Helper to put several RTCP packets into lower layer datagram composing
// Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
// TODO(danilchap): When in compound mode and packets are so many that several
// compound RTCP packets need to be generated, ensure each packet is compound.
class RtcpTransceiverImpl::PacketSender {
public:
PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,
size_t max_packet_size)
: callback_(callback), max_packet_size_(max_packet_size) {
RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE);
}
~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; }
// Appends a packet to pending compound packet.
// Sends rtcp compound packet if buffer was already full and resets buffer.
void AppendPacket(const rtcp::RtcpPacket& packet) {
packet.Create(buffer_, &index_, max_packet_size_, callback_);
}
// Sends pending rtcp compound packet.
void Send() {
if (index_ > 0) {
callback_(rtc::ArrayView<const uint8_t>(buffer_, index_));
index_ = 0;
}
}
bool IsEmpty() const { return index_ == 0; }
private:
const rtcp::RtcpPacket::PacketReadyCallback callback_;
const size_t max_packet_size_;
size_t index_ = 0;
uint8_t buffer_[IP_PACKET_SIZE];
};
RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
: config_(config), ptr_factory_(this) {
RTC_CHECK(config_.Validate());
if (config_.schedule_periodic_compound_packets)
SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
}
RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
void RtcpTransceiverImpl::AddMediaReceiverObserver(
uint32_t remote_ssrc,
MediaReceiverRtcpObserver* observer) {
auto& stored = remote_senders_[remote_ssrc].observers;
RTC_DCHECK(std::find(stored.begin(), stored.end(), observer) == stored.end());
stored.push_back(observer);
}
void RtcpTransceiverImpl::RemoveMediaReceiverObserver(
uint32_t remote_ssrc,
MediaReceiverRtcpObserver* observer) {
auto remote_sender_it = remote_senders_.find(remote_ssrc);
if (remote_sender_it == remote_senders_.end())
return;
auto& stored = remote_sender_it->second.observers;
auto it = std::find(stored.begin(), stored.end(), observer);
if (it == stored.end())
return;
stored.erase(it);
}
void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
int64_t now_us) {
while (!packet.empty()) {
rtcp::CommonHeader rtcp_block;
if (!rtcp_block.Parse(packet.data(), packet.size()))
return;
HandleReceivedPacket(rtcp_block, now_us);
// TODO(danilchap): Use packet.remove_prefix() when that function exists.
packet = packet.subview(rtcp_block.packet_size());
}
}
void RtcpTransceiverImpl::SendCompoundPacket() {
SendPeriodicCompoundPacket();
ReschedulePeriodicCompoundPackets();
}
void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps,
std::vector<uint32_t> ssrcs) {
RTC_DCHECK_GE(bitrate_bps, 0);
remb_.emplace();
remb_->SetSsrcs(std::move(ssrcs));
remb_->SetBitrateBps(bitrate_bps);
// TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb
// immideately on large bitrate change when there is one RtcpTransceiver per
// rtp transport.
}
void RtcpTransceiverImpl::UnsetRemb() {
remb_.reset();
}
void RtcpTransceiverImpl::SendRawPacket(rtc::ArrayView<const uint8_t> packet) {
// Unlike other senders, this functions just tries to send packet away and
// disregard rtcp_mode, max_packet_size or anything else.
// TODO(bugs.webrtc.org/8239): respect config_ by creating the
// TransportFeedback inside this class when there is one per rtp transport.
config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
}
void RtcpTransceiverImpl::SendNack(uint32_t ssrc,
std::vector<uint16_t> sequence_numbers) {
RTC_DCHECK(!sequence_numbers.empty());
rtcp::Nack nack;
nack.SetSenderSsrc(config_.feedback_ssrc);
nack.SetMediaSsrc(ssrc);
nack.SetPacketIds(std::move(sequence_numbers));
SendImmediateFeedback(nack);
}
void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) {
rtcp::Pli pli;
pli.SetSenderSsrc(config_.feedback_ssrc);
pli.SetMediaSsrc(ssrc);
SendImmediateFeedback(pli);
}
void RtcpTransceiverImpl::SendFullIntraRequest(
rtc::ArrayView<const uint32_t> ssrcs) {
RTC_DCHECK(!ssrcs.empty());
rtcp::Fir fir;
fir.SetSenderSsrc(config_.feedback_ssrc);
for (uint32_t media_ssrc : ssrcs)
fir.AddRequestTo(media_ssrc,
remote_senders_[media_ssrc].fir_sequence_number++);
SendImmediateFeedback(fir);
}
void RtcpTransceiverImpl::HandleReceivedPacket(
const rtcp::CommonHeader& rtcp_packet_header,
int64_t now_us) {
switch (rtcp_packet_header.type()) {
case rtcp::Bye::kPacketType:
HandleBye(rtcp_packet_header);
break;
case rtcp::SenderReport::kPacketType:
HandleSenderReport(rtcp_packet_header, now_us);
break;
case rtcp::ExtendedReports::kPacketType:
HandleExtendedReports(rtcp_packet_header, now_us);
break;
}
}
void RtcpTransceiverImpl::HandleBye(
const rtcp::CommonHeader& rtcp_packet_header) {
rtcp::Bye bye;
if (!bye.Parse(rtcp_packet_header))
return;
auto remote_sender_it = remote_senders_.find(bye.sender_ssrc());
if (remote_sender_it == remote_senders_.end())
return;
for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
observer->OnBye(bye.sender_ssrc());
}
void RtcpTransceiverImpl::HandleSenderReport(
const rtcp::CommonHeader& rtcp_packet_header,
int64_t now_us) {
rtcp::SenderReport sender_report;
if (!sender_report.Parse(rtcp_packet_header))
return;
RemoteSenderState& remote_sender =
remote_senders_[sender_report.sender_ssrc()];
rtc::Optional<SenderReportTimes>& last =
remote_sender.last_received_sender_report;
last.emplace();
last->local_received_time_us = now_us;
last->remote_sent_time = sender_report.ntp();
for (MediaReceiverRtcpObserver* observer : remote_sender.observers)
observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(),
sender_report.rtp_timestamp());
}
void RtcpTransceiverImpl::HandleExtendedReports(
const rtcp::CommonHeader& rtcp_packet_header,
int64_t now_us) {
rtcp::ExtendedReports extended_reports;
if (!extended_reports.Parse(rtcp_packet_header))
return;
if (extended_reports.dlrr())
HandleDlrr(extended_reports.dlrr(), now_us);
if (extended_reports.target_bitrate())
HandleTargetBitrate(*extended_reports.target_bitrate(),
extended_reports.sender_ssrc());
}
void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, int64_t now_us) {
if (!config_.non_sender_rtt_measurement || config_.rtt_observer == nullptr)
return;
// Delay and last_rr are transferred using 32bit compact ntp resolution.
// Convert packet arrival time to same format through 64bit ntp format.
uint32_t receive_time_ntp = CompactNtp(TimeMicrosToNtp(now_us));
for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) {
if (rti.ssrc != config_.feedback_ssrc)
continue;
uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr;
int64_t rtt_ms = CompactNtpRttToMs(rtt_ntp);
config_.rtt_observer->OnRttUpdate(rtt_ms);
}
}
void RtcpTransceiverImpl::HandleTargetBitrate(
const rtcp::TargetBitrate& target_bitrate,
uint32_t remote_ssrc) {
auto remote_sender_it = remote_senders_.find(remote_ssrc);
if (remote_sender_it == remote_senders_.end() ||
remote_sender_it->second.observers.empty())
return;
// Convert rtcp::TargetBitrate to BitrateAllocation from common types.
BitrateAllocation bitrate_allocation;
for (const rtcp::TargetBitrate::BitrateItem& item :
target_bitrate.GetTargetBitrates()) {
if (item.spatial_layer >= kMaxSpatialLayers ||
item.temporal_layer >= kMaxTemporalStreams) {
RTC_DLOG(LS_WARNING)
<< config_.debug_id
<< "Invalid incoming TargetBitrate with spatial layer "
<< item.spatial_layer << ", temporal layer " << item.temporal_layer;
continue;
}
bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer,
item.target_bitrate_kbps * 1000);
}
for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation);
}
void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
if (!config_.schedule_periodic_compound_packets)
return;
// Stop existent send task.
ptr_factory_.InvalidateWeakPtrs();
SchedulePeriodicCompoundPackets(config_.report_period_ms);
}
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
class SendPeriodicCompoundPacketTask : public rtc::QueuedTask {
public:
SendPeriodicCompoundPacketTask(rtc::TaskQueue* task_queue,
rtc::WeakPtr<RtcpTransceiverImpl> ptr)
: task_queue_(task_queue), ptr_(std::move(ptr)) {}
bool Run() override {
RTC_DCHECK(task_queue_->IsCurrent());
if (!ptr_)
return true;
ptr_->SendPeriodicCompoundPacket();
task_queue_->PostDelayedTask(rtc::WrapUnique(this),
ptr_->config_.report_period_ms);
return false;
}
private:
rtc::TaskQueue* const task_queue_;
const rtc::WeakPtr<RtcpTransceiverImpl> ptr_;
};
RTC_DCHECK(config_.schedule_periodic_compound_packets);
auto task = rtc::MakeUnique<SendPeriodicCompoundPacketTask>(
config_.task_queue, ptr_factory_.GetWeakPtr());
if (delay_ms > 0)
config_.task_queue->PostDelayedTask(std::move(task), delay_ms);
else
config_.task_queue->PostTask(std::move(task));
}
void RtcpTransceiverImpl::CreateCompoundPacket(PacketSender* sender) {
RTC_DCHECK(sender->IsEmpty());
const uint32_t sender_ssrc = config_.feedback_ssrc;
int64_t now_us = rtc::TimeMicros();
rtcp::ReceiverReport receiver_report;
receiver_report.SetSenderSsrc(sender_ssrc);
receiver_report.SetReportBlocks(CreateReportBlocks(now_us));
sender->AppendPacket(receiver_report);
if (!config_.cname.empty()) {
rtcp::Sdes sdes;
bool added = sdes.AddCName(config_.feedback_ssrc, config_.cname);
RTC_DCHECK(added) << "Failed to add cname " << config_.cname
<< " to rtcp sdes packet.";
sender->AppendPacket(sdes);
}
if (remb_) {
remb_->SetSenderSsrc(sender_ssrc);
sender->AppendPacket(*remb_);
}
// TODO(bugs.webrtc.org/8239): Do not send rrtr if this packet starts with
// SenderReport instead of ReceiverReport
// when RtcpTransceiver supports rtp senders.
if (config_.non_sender_rtt_measurement) {
rtcp::ExtendedReports xr;
rtcp::Rrtr rrtr;
rrtr.SetNtp(TimeMicrosToNtp(now_us));
xr.SetRrtr(rrtr);
xr.SetSenderSsrc(sender_ssrc);
sender->AppendPacket(xr);
}
}
void RtcpTransceiverImpl::SendPeriodicCompoundPacket() {
auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
};
PacketSender sender(send_packet, config_.max_packet_size);
CreateCompoundPacket(&sender);
sender.Send();
}
void RtcpTransceiverImpl::SendImmediateFeedback(
const rtcp::RtcpPacket& rtcp_packet) {
auto send_packet = [this](rtc::ArrayView<const uint8_t> packet) {
config_.outgoing_transport->SendRtcp(packet.data(), packet.size());
};
PacketSender sender(send_packet, config_.max_packet_size);
// Compound mode requires every sent rtcp packet to be compound, i.e. start
// with a sender or receiver report.
if (config_.rtcp_mode == RtcpMode::kCompound)
CreateCompoundPacket(&sender);
sender.AppendPacket(rtcp_packet);
sender.Send();
// If compound packet was sent, delay (reschedule) the periodic one.
if (config_.rtcp_mode == RtcpMode::kCompound)
ReschedulePeriodicCompoundPackets();
}
std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks(
int64_t now_us) {
if (!config_.receive_statistics)
return {};
// TODO(danilchap): Support sending more than
// |ReceiverReport::kMaxNumberOfReportBlocks| per compound rtcp packet.
std::vector<rtcp::ReportBlock> report_blocks =
config_.receive_statistics->RtcpReportBlocks(
rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
for (rtcp::ReportBlock& report_block : report_blocks) {
auto it = remote_senders_.find(report_block.source_ssrc());
if (it == remote_senders_.end() || !it->second.last_received_sender_report)
continue;
const SenderReportTimes& last_sender_report =
*it->second.last_received_sender_report;
report_block.SetLastSr(CompactNtp(last_sender_report.remote_sent_time));
report_block.SetDelayLastSr(SaturatedUsToCompactNtp(
now_us - last_sender_report.local_received_time_us));
}
return report_blocks;
}
} // namespace webrtc