Refactorings to send RTCP packets directly via the RtcpPacket callback, with some simplifications enabled by this. NACK now also sent via RtcpPacket.

BUG=webrtc:2450
R=asapersson@webrtc.org

Review URL: https://codereview.webrtc.org/1309833002 .

Cr-Commit-Position: refs/heads/master@{#10888}
This commit is contained in:
Erik Språng
2015-12-04 10:40:35 +01:00
parent 9cf0c3d4dd
commit f7c5776d42
3 changed files with 257 additions and 410 deletions

View File

@ -116,14 +116,13 @@ class RtcpPacket {
size_t HeaderLength() const;
static const size_t kHeaderLength = 4;
std::vector<RtcpPacket*> appended_packets_;
private:
bool CreateAndAddAppended(uint8_t* packet,
size_t* index,
size_t max_length,
PacketReadyCallback* callback) const;
std::vector<RtcpPacket*> appended_packets_;
};
// TODO(sprang): Move RtcpPacket subclasses out to separate files.

View File

@ -77,61 +77,61 @@ RTCPSender::FeedbackState::FeedbackState()
module(nullptr) {
}
struct RTCPSender::RtcpContext {
class PacketContainer : public rtcp::Empty,
public rtcp::RtcpPacket::PacketReadyCallback {
public:
explicit PacketContainer(Transport* transport)
: transport_(transport), bytes_sent_(0) {}
virtual ~PacketContainer() {
for (RtcpPacket* packet : appended_packets_)
delete packet;
}
void OnPacketReady(uint8_t* data, size_t length) override {
if (transport_->SendRtcp(data, length))
bytes_sent_ += length;
}
size_t SendPackets() {
rtcp::Empty::Build(this);
return bytes_sent_;
}
private:
Transport* transport_;
size_t bytes_sent_;
};
class RTCPSender::RtcpContext {
public:
RtcpContext(const FeedbackState& feedback_state,
int32_t nack_size,
const uint16_t* nack_list,
bool repeat,
uint64_t picture_id,
uint8_t* buffer,
uint32_t buffer_size)
: feedback_state(feedback_state),
nack_size(nack_size),
nack_list(nack_list),
repeat(repeat),
picture_id(picture_id),
buffer(buffer),
buffer_size(buffer_size),
ntp_sec(0),
ntp_frac(0),
position(0) {}
uint32_t ntp_sec,
uint32_t ntp_frac,
PacketContainer* container)
: feedback_state_(feedback_state),
nack_size_(nack_size),
nack_list_(nack_list),
repeat_(repeat),
picture_id_(picture_id),
ntp_sec_(ntp_sec),
ntp_frac_(ntp_frac),
container_(container) {}
uint8_t* AllocateData(uint32_t bytes) {
RTC_DCHECK_LE(position + bytes, buffer_size);
uint8_t* ptr = &buffer[position];
position += bytes;
return ptr;
}
virtual ~RtcpContext() {}
const FeedbackState& feedback_state;
int32_t nack_size;
const uint16_t* nack_list;
bool repeat;
uint64_t picture_id;
uint8_t* buffer;
uint32_t buffer_size;
uint32_t ntp_sec;
uint32_t ntp_frac;
uint32_t position;
};
const FeedbackState& feedback_state_;
const int32_t nack_size_;
const uint16_t* nack_list_;
const bool repeat_;
const uint64_t picture_id_;
const uint32_t ntp_sec_;
const uint32_t ntp_frac_;
// TODO(sprang): Once all builders use RtcpPacket, call SendToNetwork() here.
class RTCPSender::PacketBuiltCallback
: public rtcp::RtcpPacket::PacketReadyCallback {
public:
PacketBuiltCallback(RtcpContext* context) : context_(context) {}
virtual ~PacketBuiltCallback() {}
void OnPacketReady(uint8_t* data, size_t length) override {
context_->position += length;
}
bool BuildPacket(const rtcp::RtcpPacket& packet) {
return packet.BuildExternalBuffer(
&context_->buffer[context_->position],
context_->buffer_size - context_->position, this);
}
private:
RtcpContext* const context_;
PacketContainer* const container_;
};
RTCPSender::RTCPSender(
@ -474,15 +474,15 @@ int32_t RTCPSender::AddReportBlock(const RTCPReportBlock& report_block) {
return 0;
}
RTCPSender::BuildResult RTCPSender::BuildSR(RtcpContext* ctx) {
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildSR(const RtcpContext& ctx) {
for (int i = (RTCP_NUMBER_OF_SR - 2); i >= 0; i--) {
// shift old
last_send_report_[i + 1] = last_send_report_[i];
last_rtcp_time_[i + 1] = last_rtcp_time_[i];
}
last_rtcp_time_[0] = Clock::NtpToMs(ctx->ntp_sec, ctx->ntp_frac);
last_send_report_[0] = (ctx->ntp_sec << 16) + (ctx->ntp_frac >> 16);
last_rtcp_time_[0] = Clock::NtpToMs(ctx.ntp_sec_, ctx.ntp_frac_);
last_send_report_[0] = (ctx.ntp_sec_ << 16) + (ctx.ntp_frac_ >> 16);
// The timestamp of this RTCP packet should be estimated as the timestamp of
// the frame being captured at this moment. We are calculating that
@ -491,67 +491,52 @@ RTCPSender::BuildResult RTCPSender::BuildSR(RtcpContext* ctx) {
uint32_t rtp_timestamp =
start_timestamp_ + last_rtp_timestamp_ +
(clock_->TimeInMilliseconds() - last_frame_capture_time_ms_) *
(ctx->feedback_state.frequency_hz / 1000);
(ctx.feedback_state_.frequency_hz / 1000);
rtcp::SenderReport report;
report.From(ssrc_);
report.WithNtpSec(ctx->ntp_sec);
report.WithNtpFrac(ctx->ntp_frac);
report.WithRtpTimestamp(rtp_timestamp);
report.WithPacketCount(ctx->feedback_state.packets_sent);
report.WithOctetCount(ctx->feedback_state.media_bytes_sent);
rtcp::SenderReport* report = new rtcp::SenderReport();
report->From(ssrc_);
report->WithNtpSec(ctx.ntp_sec_);
report->WithNtpFrac(ctx.ntp_frac_);
report->WithRtpTimestamp(rtp_timestamp);
report->WithPacketCount(ctx.feedback_state_.packets_sent);
report->WithOctetCount(ctx.feedback_state_.media_bytes_sent);
for (auto it : report_blocks_)
report.WithReportBlock(it.second);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(report))
return BuildResult::kTruncated;
report->WithReportBlock(it.second);
report_blocks_.clear();
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::SenderReport>(report);
}
RTCPSender::BuildResult RTCPSender::BuildSDES(RtcpContext* ctx) {
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildSDES(
const RtcpContext& ctx) {
size_t length_cname = cname_.length();
RTC_CHECK_LT(length_cname, static_cast<size_t>(RTCP_CNAME_SIZE));
rtcp::Sdes sdes;
sdes.WithCName(ssrc_, cname_);
rtcp::Sdes* sdes = new rtcp::Sdes();
sdes->WithCName(ssrc_, cname_);
for (const auto it : csrc_cnames_)
sdes.WithCName(it.first, it.second);
sdes->WithCName(it.first, it.second);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(sdes))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Sdes>(sdes);
}
RTCPSender::BuildResult RTCPSender::BuildRR(RtcpContext* ctx) {
rtcp::ReceiverReport report;
report.From(ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildRR(const RtcpContext& ctx) {
rtcp::ReceiverReport* report = new rtcp::ReceiverReport();
report->From(ssrc_);
for (auto it : report_blocks_)
report.WithReportBlock(it.second);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(report))
return BuildResult::kTruncated;
report->WithReportBlock(it.second);
report_blocks_.clear();
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::ReceiverReport>(report);
}
RTCPSender::BuildResult RTCPSender::BuildPLI(RtcpContext* ctx) {
rtcp::Pli pli;
pli.From(ssrc_);
pli.To(remote_ssrc_);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(pli))
return BuildResult::kTruncated;
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildPLI(const RtcpContext& ctx) {
rtcp::Pli* pli = new rtcp::Pli();
pli->From(ssrc_);
pli->To(remote_ssrc_);
TRACE_EVENT_INSTANT0(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
"RTCPSender::PLI");
@ -559,21 +544,17 @@ RTCPSender::BuildResult RTCPSender::BuildPLI(RtcpContext* ctx) {
TRACE_COUNTER_ID1(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"), "RTCP_PLICount",
ssrc_, packet_type_counter_.pli_packets);
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Pli>(pli);
}
RTCPSender::BuildResult RTCPSender::BuildFIR(RtcpContext* ctx) {
if (!ctx->repeat)
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildFIR(const RtcpContext& ctx) {
if (!ctx.repeat_)
++sequence_number_fir_; // Do not increase if repetition.
rtcp::Fir fir;
fir.From(ssrc_);
fir.To(remote_ssrc_);
fir.WithCommandSeqNum(sequence_number_fir_);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(fir))
return BuildResult::kTruncated;
rtcp::Fir* fir = new rtcp::Fir();
fir->From(ssrc_);
fir->To(remote_ssrc_);
fir->WithCommandSeqNum(sequence_number_fir_);
TRACE_EVENT_INSTANT0(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
"RTCPSender::FIR");
@ -581,7 +562,7 @@ RTCPSender::BuildResult RTCPSender::BuildFIR(RtcpContext* ctx) {
TRACE_COUNTER_ID1(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"), "RTCP_FIRCount",
ssrc_, packet_type_counter_.fir_packets);
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Fir>(fir);
}
/*
@ -591,20 +572,16 @@ RTCPSender::BuildResult RTCPSender::BuildFIR(RtcpContext* ctx) {
| First | Number | PictureID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
*/
RTCPSender::BuildResult RTCPSender::BuildSLI(RtcpContext* ctx) {
rtcp::Sli sli;
sli.From(ssrc_);
sli.To(remote_ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildSLI(const RtcpContext& ctx) {
rtcp::Sli* sli = new rtcp::Sli();
sli->From(ssrc_);
sli->To(remote_ssrc_);
// Crop picture id to 6 least significant bits.
sli.WithPictureId(ctx->picture_id & 0x3F);
sli.WithFirstMb(0);
sli.WithNumberOfMb(0x1FFF); // 13 bits, only ones for now.
sli->WithPictureId(ctx.picture_id_ & 0x3F);
sli->WithFirstMb(0);
sli->WithNumberOfMb(0x1FFF); // 13 bits, only ones for now.
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(sli))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Sli>(sli);
}
/*
@ -619,38 +596,32 @@ RTCPSender::BuildResult RTCPSender::BuildSLI(RtcpContext* ctx) {
/*
* Note: not generic made for VP8
*/
RTCPSender::BuildResult RTCPSender::BuildRPSI(RtcpContext* ctx) {
if (ctx->feedback_state.send_payload_type == 0xFF)
return BuildResult::kError;
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildRPSI(
const RtcpContext& ctx) {
if (ctx.feedback_state_.send_payload_type == 0xFF)
return nullptr;
rtcp::Rpsi rpsi;
rpsi.From(ssrc_);
rpsi.To(remote_ssrc_);
rpsi.WithPayloadType(ctx->feedback_state.send_payload_type);
rpsi.WithPictureId(ctx->picture_id);
rtcp::Rpsi* rpsi = new rtcp::Rpsi();
rpsi->From(ssrc_);
rpsi->To(remote_ssrc_);
rpsi->WithPayloadType(ctx.feedback_state_.send_payload_type);
rpsi->WithPictureId(ctx.picture_id_);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(rpsi))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Rpsi>(rpsi);
}
RTCPSender::BuildResult RTCPSender::BuildREMB(RtcpContext* ctx) {
rtcp::Remb remb;
remb.From(ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildREMB(
const RtcpContext& ctx) {
rtcp::Remb* remb = new rtcp::Remb();
remb->From(ssrc_);
for (uint32_t ssrc : remb_ssrcs_)
remb.AppliesTo(ssrc);
remb.WithBitrateBps(remb_bitrate_);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(remb))
return BuildResult::kTruncated;
remb->AppliesTo(ssrc);
remb->WithBitrateBps(remb_bitrate_);
TRACE_EVENT_INSTANT0(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"),
"RTCPSender::REMB");
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Remb>(remb);
}
void RTCPSender::SetTargetBitrate(unsigned int target_bitrate) {
@ -658,9 +629,10 @@ void RTCPSender::SetTargetBitrate(unsigned int target_bitrate) {
tmmbr_send_ = target_bitrate / 1000;
}
RTCPSender::BuildResult RTCPSender::BuildTMMBR(RtcpContext* ctx) {
if (ctx->feedback_state.module == NULL)
return BuildResult::kError;
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildTMMBR(
const RtcpContext& ctx) {
if (ctx.feedback_state_.module == nullptr)
return nullptr;
// Before sending the TMMBR check the received TMMBN, only an owner is
// allowed to raise the bitrate:
// * If the sender is an owner of the TMMBN -> send TMMBR
@ -675,14 +647,14 @@ RTCPSender::BuildResult RTCPSender::BuildTMMBR(RtcpContext* ctx) {
// will accuire criticalSectionRTCPReceiver_ is a potental deadlock but
// since RTCPreceiver is not doing the reverse we should be fine
int32_t lengthOfBoundingSet =
ctx->feedback_state.module->BoundingSet(tmmbrOwner, candidateSet);
ctx.feedback_state_.module->BoundingSet(tmmbrOwner, candidateSet);
if (lengthOfBoundingSet > 0) {
for (int32_t i = 0; i < lengthOfBoundingSet; i++) {
if (candidateSet->Tmmbr(i) == tmmbr_send_ &&
candidateSet->PacketOH(i) == packet_oh_send_) {
// do not send the same tuple
return BuildResult::kAborted;
// Do not send the same tuple.
return nullptr;
}
}
if (!tmmbrOwner) {
@ -693,124 +665,69 @@ RTCPSender::BuildResult RTCPSender::BuildTMMBR(RtcpContext* ctx) {
int numCandidates = lengthOfBoundingSet + 1;
// find bounding set
TMMBRSet* boundingSet = NULL;
TMMBRSet* boundingSet = nullptr;
int numBoundingSet = tmmbr_help_.FindTMMBRBoundingSet(boundingSet);
if (numBoundingSet > 0 || numBoundingSet <= numCandidates)
tmmbrOwner = tmmbr_help_.IsOwner(ssrc_, numBoundingSet);
if (!tmmbrOwner) {
// did not enter bounding set, no meaning to send this request
return BuildResult::kAborted;
// Did not enter bounding set, no meaning to send this request.
return nullptr;
}
}
}
if (tmmbr_send_) {
rtcp::Tmmbr tmmbr;
tmmbr.From(ssrc_);
tmmbr.To(remote_ssrc_);
tmmbr.WithBitrateKbps(tmmbr_send_);
tmmbr.WithOverhead(packet_oh_send_);
if (!tmmbr_send_)
return nullptr;
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(tmmbr))
return BuildResult::kTruncated;
}
return BuildResult::kSuccess;
rtcp::Tmmbr* tmmbr = new rtcp::Tmmbr();
tmmbr->From(ssrc_);
tmmbr->To(remote_ssrc_);
tmmbr->WithBitrateKbps(tmmbr_send_);
tmmbr->WithOverhead(packet_oh_send_);
return rtc::scoped_ptr<rtcp::Tmmbr>(tmmbr);
}
RTCPSender::BuildResult RTCPSender::BuildTMMBN(RtcpContext* ctx) {
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildTMMBN(
const RtcpContext& ctx) {
TMMBRSet* boundingSet = tmmbr_help_.BoundingSetToSend();
if (boundingSet == NULL)
return BuildResult::kError;
if (boundingSet == nullptr)
return nullptr;
rtcp::Tmmbn tmmbn;
tmmbn.From(ssrc_);
rtcp::Tmmbn* tmmbn = new rtcp::Tmmbn();
tmmbn->From(ssrc_);
for (uint32_t i = 0; i < boundingSet->lengthOfSet(); i++) {
if (boundingSet->Tmmbr(i) > 0) {
tmmbn.WithTmmbr(boundingSet->Ssrc(i), boundingSet->Tmmbr(i),
boundingSet->PacketOH(i));
tmmbn->WithTmmbr(boundingSet->Ssrc(i), boundingSet->Tmmbr(i),
boundingSet->PacketOH(i));
}
}
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(tmmbn))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Tmmbn>(tmmbn);
}
RTCPSender::BuildResult RTCPSender::BuildAPP(RtcpContext* ctx) {
rtcp::App app;
app.From(ssrc_);
app.WithSubType(app_sub_type_);
app.WithName(app_name_);
app.WithData(app_data_.get(), app_length_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildAPP(const RtcpContext& ctx) {
rtcp::App* app = new rtcp::App();
app->From(ssrc_);
app->WithSubType(app_sub_type_);
app->WithName(app_name_);
app->WithData(app_data_.get(), app_length_);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(app))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::App>(app);
}
RTCPSender::BuildResult RTCPSender::BuildNACK(RtcpContext* ctx) {
// sanity
if (ctx->position + 16 >= IP_PACKET_SIZE) {
LOG(LS_WARNING) << "Failed to build NACK.";
return BuildResult::kTruncated;
}
// int size, uint16_t* nack_list
// add nack list
uint8_t FMT = 1;
*ctx->AllocateData(1) = 0x80 + FMT;
*ctx->AllocateData(1) = 205;
*ctx->AllocateData(1) = 0;
int nack_size_pos_ = ctx->position;
*ctx->AllocateData(1) = 3; // setting it to one kNACK signal as default
// Add our own SSRC
ByteWriter<uint32_t>::WriteBigEndian(ctx->AllocateData(4), ssrc_);
// Add the remote SSRC
ByteWriter<uint32_t>::WriteBigEndian(ctx->AllocateData(4), remote_ssrc_);
// Build NACK bitmasks and write them to the RTCP message.
// The nack list should be sorted and not contain duplicates if one
// wants to build the smallest rtcp nack packet.
int numOfNackFields = 0;
int maxNackFields =
std::min<int>(kRtcpMaxNackFields, (IP_PACKET_SIZE - ctx->position) / 4);
int i = 0;
while (i < ctx->nack_size && numOfNackFields < maxNackFields) {
uint16_t nack = ctx->nack_list[i++];
uint16_t bitmask = 0;
while (i < ctx->nack_size) {
int shift = static_cast<uint16_t>(ctx->nack_list[i] - nack) - 1;
if (shift >= 0 && shift <= 15) {
bitmask |= (1 << shift);
++i;
} else {
break;
}
}
// Write the sequence number and the bitmask to the packet.
assert(ctx->position + 4 < IP_PACKET_SIZE);
ByteWriter<uint16_t>::WriteBigEndian(ctx->AllocateData(2), nack);
ByteWriter<uint16_t>::WriteBigEndian(ctx->AllocateData(2), bitmask);
numOfNackFields++;
}
ctx->buffer[nack_size_pos_] = static_cast<uint8_t>(2 + numOfNackFields);
if (i != ctx->nack_size)
LOG(LS_WARNING) << "Nack list too large for one packet.";
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildNACK(
const RtcpContext& ctx) {
rtcp::Nack* nack = new rtcp::Nack();
nack->From(ssrc_);
nack->To(remote_ssrc_);
nack->WithList(ctx.nack_list_, ctx.nack_size_);
// Report stats.
NACKStringBuilder stringBuilder;
for (int idx = 0; idx < i; ++idx) {
stringBuilder.PushNACK(ctx->nack_list[idx]);
nack_stats_.ReportRequest(ctx->nack_list[idx]);
for (int idx = 0; idx < ctx.nack_size_; ++idx) {
stringBuilder.PushNACK(ctx.nack_list_[idx]);
nack_stats_.ReportRequest(ctx.nack_list_[idx]);
}
packet_type_counter_.nack_requests = nack_stats_.requests();
packet_type_counter_.unique_nack_requests = nack_stats_.unique_requests();
@ -822,70 +739,59 @@ RTCPSender::BuildResult RTCPSender::BuildNACK(RtcpContext* ctx) {
TRACE_COUNTER_ID1(TRACE_DISABLED_BY_DEFAULT("webrtc_rtp"), "RTCP_NACKCount",
ssrc_, packet_type_counter_.nack_packets);
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Nack>(nack);
}
RTCPSender::BuildResult RTCPSender::BuildBYE(RtcpContext* ctx) {
rtcp::Bye bye;
bye.From(ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildBYE(const RtcpContext& ctx) {
rtcp::Bye* bye = new rtcp::Bye();
bye->From(ssrc_);
for (uint32_t csrc : csrcs_)
bye.WithCsrc(csrc);
bye->WithCsrc(csrc);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(bye))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Bye>(bye);
}
RTCPSender::BuildResult RTCPSender::BuildReceiverReferenceTime(
RtcpContext* ctx) {
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildReceiverReferenceTime(
const RtcpContext& ctx) {
if (last_xr_rr_.size() >= RTCP_NUMBER_OF_SR)
last_xr_rr_.erase(last_xr_rr_.begin());
last_xr_rr_.insert(std::pair<uint32_t, int64_t>(
RTCPUtility::MidNtp(ctx->ntp_sec, ctx->ntp_frac),
Clock::NtpToMs(ctx->ntp_sec, ctx->ntp_frac)));
RTCPUtility::MidNtp(ctx.ntp_sec_, ctx.ntp_frac_),
Clock::NtpToMs(ctx.ntp_sec_, ctx.ntp_frac_)));
rtcp::Xr xr;
xr.From(ssrc_);
rtcp::Xr* xr = new rtcp::Xr();
xr->From(ssrc_);
rtcp::Rrtr rrtr;
rrtr.WithNtpSec(ctx->ntp_sec);
rrtr.WithNtpFrac(ctx->ntp_frac);
rrtr.WithNtpSec(ctx.ntp_sec_);
rrtr.WithNtpFrac(ctx.ntp_frac_);
xr.WithRrtr(&rrtr);
xr->WithRrtr(&rrtr);
// TODO(sprang): Merge XR report sending to contain all of RRTR, DLRR, VOIP?
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(xr))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Xr>(xr);
}
RTCPSender::BuildResult RTCPSender::BuildDlrr(RtcpContext* ctx) {
rtcp::Xr xr;
xr.From(ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildDlrr(
const RtcpContext& ctx) {
rtcp::Xr* xr = new rtcp::Xr();
xr->From(ssrc_);
rtcp::Dlrr dlrr;
const RtcpReceiveTimeInfo& info = ctx->feedback_state.last_xr_rr;
const RtcpReceiveTimeInfo& info = ctx.feedback_state_.last_xr_rr;
dlrr.WithDlrrItem(info.sourceSSRC, info.lastRR, info.delaySinceLastRR);
xr.WithDlrr(&dlrr);
xr->WithDlrr(&dlrr);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(xr))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Xr>(xr);
}
// TODO(sprang): Add a unit test for this, or remove if the code isn't used.
RTCPSender::BuildResult RTCPSender::BuildVoIPMetric(RtcpContext* ctx) {
rtcp::Xr xr;
xr.From(ssrc_);
rtc::scoped_ptr<rtcp::RtcpPacket> RTCPSender::BuildVoIPMetric(
const RtcpContext& context) {
rtcp::Xr* xr = new rtcp::Xr();
xr->From(ssrc_);
rtcp::VoipMetric voip;
voip.To(remote_ssrc_);
@ -910,13 +816,9 @@ RTCPSender::BuildResult RTCPSender::BuildVoIPMetric(RtcpContext* ctx) {
voip.JbMax(xr_voip_metric_.JBmax);
voip.JbAbsMax(xr_voip_metric_.JBabsMax);
xr.WithVoipMetric(&voip);
xr->WithVoipMetric(&voip);
PacketBuiltCallback callback(ctx);
if (!callback.BuildPacket(xr))
return BuildResult::kTruncated;
return BuildResult::kSuccess;
return rtc::scoped_ptr<rtcp::Xr>(xr);
}
int32_t RTCPSender::SendRTCP(const FeedbackState& feedback_state,
@ -932,43 +834,59 @@ int32_t RTCPSender::SendRTCP(const FeedbackState& feedback_state,
int32_t RTCPSender::SendCompoundRTCP(
const FeedbackState& feedback_state,
const std::set<RTCPPacketType>& packetTypes,
const std::set<RTCPPacketType>& packet_types,
int32_t nack_size,
const uint16_t* nack_list,
bool repeat,
uint64_t pictureID) {
PacketContainer container(transport_);
{
CriticalSectionScoped lock(critical_section_rtcp_sender_.get());
if (method_ == RtcpMode::kOff) {
LOG(LS_WARNING) << "Can't send rtcp if it is disabled.";
return -1;
}
// We need to send our NTP even if we haven't received any reports.
uint32_t ntp_sec;
uint32_t ntp_frac;
clock_->CurrentNtp(ntp_sec, ntp_frac);
RtcpContext context(feedback_state, nack_size, nack_list, repeat, pictureID,
ntp_sec, ntp_frac, &container);
PrepareReport(packet_types, feedback_state);
auto it = report_flags_.begin();
while (it != report_flags_.end()) {
auto builder_it = builders_.find(it->type);
RTC_DCHECK(builder_it != builders_.end());
if (it->is_volatile) {
report_flags_.erase(it++);
} else {
++it;
}
BuilderFunc func = builder_it->second;
rtc::scoped_ptr<rtcp::RtcpPacket> packet = (this->*func)(context);
if (packet.get() == nullptr)
return -1;
container.Append(packet.release());
}
if (packet_type_counter_observer_ != nullptr) {
packet_type_counter_observer_->RtcpPacketTypesCounterUpdated(
remote_ssrc_, packet_type_counter_);
}
RTC_DCHECK(AllVolatileFlagsConsumed());
}
uint8_t rtcp_buffer[IP_PACKET_SIZE];
int rtcp_length =
PrepareRTCP(feedback_state, packetTypes, nack_size, nack_list, repeat,
pictureID, rtcp_buffer, IP_PACKET_SIZE);
// Sanity don't send empty packets.
if (rtcp_length <= 0)
return -1;
return SendToNetwork(rtcp_buffer, static_cast<size_t>(rtcp_length));
size_t bytes_sent = container.SendPackets();
return bytes_sent == 0 ? -1 : 0;
}
int RTCPSender::PrepareRTCP(const FeedbackState& feedback_state,
const std::set<RTCPPacketType>& packetTypes,
int32_t nack_size,
const uint16_t* nack_list,
bool repeat,
uint64_t pictureID,
uint8_t* rtcp_buffer,
int buffer_size) {
CriticalSectionScoped lock(critical_section_rtcp_sender_.get());
RtcpContext context(feedback_state, nack_size, nack_list, repeat, pictureID,
rtcp_buffer, buffer_size);
void RTCPSender::PrepareReport(const std::set<RTCPPacketType>& packetTypes,
const FeedbackState& feedback_state) {
// Add all flags as volatile. Non volatile entries will not be overwritten
// and all new volatile flags added will be consumed by the end of this call.
SetFlags(packetTypes, true);
@ -992,9 +910,6 @@ int RTCPSender::PrepareRTCP(const FeedbackState& feedback_state,
if (IsFlagPresent(kRtcpSr) || (IsFlagPresent(kRtcpRr) && !cname_.empty()))
SetFlag(kRtcpSdes, true);
// We need to send our NTP even if we haven't received any reports.
clock_->CurrentNtp(context.ntp_sec, context.ntp_frac);
if (generate_report) {
if (!sending_ && xr_send_receiver_reference_time_enabled_)
SetFlag(kRtcpXrReceiverReferenceTime, true);
@ -1028,8 +943,8 @@ int RTCPSender::PrepareRTCP(const FeedbackState& feedback_state,
if (!statisticians.empty()) {
for (auto it = statisticians.begin(); it != statisticians.end(); ++it) {
RTCPReportBlock report_block;
if (PrepareReport(feedback_state, it->first, it->second,
&report_block)) {
if (PrepareReportBlock(feedback_state, it->first, it->second,
&report_block)) {
// TODO(danilchap) AddReportBlock may fail (for 2 different reasons).
// Probably it shouldn't be ignored.
AddReportBlock(report_block);
@ -1037,48 +952,12 @@ int RTCPSender::PrepareRTCP(const FeedbackState& feedback_state,
}
}
}
auto it = report_flags_.begin();
while (it != report_flags_.end()) {
auto builder = builders_.find(it->type);
RTC_DCHECK(builder != builders_.end());
if (it->is_volatile) {
report_flags_.erase(it++);
} else {
++it;
}
uint32_t start_position = context.position;
BuildResult result = (this->*(builder->second))(&context);
switch (result) {
case BuildResult::kError:
return -1;
case BuildResult::kTruncated:
return context.position;
case BuildResult::kAborted:
context.position = start_position;
FALLTHROUGH();
case BuildResult::kSuccess:
continue;
default:
abort();
}
}
if (packet_type_counter_observer_ != NULL) {
packet_type_counter_observer_->RtcpPacketTypesCounterUpdated(
remote_ssrc_, packet_type_counter_);
}
RTC_DCHECK(AllVolatileFlagsConsumed());
return context.position;
}
bool RTCPSender::PrepareReport(const FeedbackState& feedback_state,
uint32_t ssrc,
StreamStatistician* statistician,
RTCPReportBlock* report_block) {
bool RTCPSender::PrepareReportBlock(const FeedbackState& feedback_state,
uint32_t ssrc,
StreamStatistician* statistician,
RTCPReportBlock* report_block) {
// Do we have receive statistics to send?
RtcpStatistics stats;
if (!statistician->GetStatistics(&stats, true))
@ -1116,12 +995,6 @@ bool RTCPSender::PrepareReport(const FeedbackState& feedback_state,
return true;
}
int32_t RTCPSender::SendToNetwork(const uint8_t* dataBuffer, size_t length) {
if (transport_->SendRtcp(dataBuffer, length))
return 0;
return -1;
}
void RTCPSender::SetCsrcs(const std::vector<uint32_t>& csrcs) {
assert(csrcs.size() <= kRtpCsrcSize);
CriticalSectionScoped lock(critical_section_rtcp_sender_.get());

View File

@ -149,77 +149,53 @@ public:
bool SendFeedbackPacket(const rtcp::TransportFeedback& packet);
private:
struct RtcpContext;
class RtcpContext;
// The BuildResult indicates the outcome of a call to a builder method,
// constructing a part of an RTCP packet:
//
// kError
// Building RTCP packet failed, propagate error out to caller.
// kAbort
// The (partial) block being build should not be included. Reset current
// buffer position to the state before the method call and proceed to the
// next packet type.
// kTruncated
// There is not enough room in the buffer to fit the data being constructed.
// (IP packet is full). Proceed to the next packet type, and call this
// method again when a new buffer has been allocated.
// TODO(sprang): Actually allocate multiple packets if needed.
// kSuccess
// Data has been successfully placed in the buffer.
enum class BuildResult { kError, kAborted, kTruncated, kSuccess };
int32_t SendToNetwork(const uint8_t* dataBuffer, size_t length);
// Determine which RTCP messages should be sent and setup flags.
void PrepareReport(const std::set<RTCPPacketType>& packetTypes,
const FeedbackState& feedback_state)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
int32_t AddReportBlock(const RTCPReportBlock& report_block)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
bool PrepareReport(const FeedbackState& feedback_state,
uint32_t ssrc,
StreamStatistician* statistician,
RTCPReportBlock* report_block);
bool PrepareReportBlock(const FeedbackState& feedback_state,
uint32_t ssrc,
StreamStatistician* statistician,
RTCPReportBlock* report_block);
int PrepareRTCP(const FeedbackState& feedback_state,
const std::set<RTCPPacketType>& packetTypes,
int32_t nackSize,
const uint16_t* nackList,
bool repeat,
uint64_t pictureID,
uint8_t* rtcp_buffer,
int buffer_size);
BuildResult BuildSR(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildSR(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildRR(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildRR(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildSDES(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildSDES(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildPLI(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildPLI(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildREMB(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildREMB(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildTMMBR(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildTMMBR(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildTMMBN(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildTMMBN(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildAPP(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildAPP(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildVoIPMetric(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildVoIPMetric(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildBYE(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildBYE(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildFIR(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildFIR(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildSLI(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildSLI(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildRPSI(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildRPSI(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildNACK(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildNACK(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildReceiverReferenceTime(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildReceiverReferenceTime(
const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
BuildResult BuildDlrr(RtcpContext* context)
rtc::scoped_ptr<rtcp::RtcpPacket> BuildDlrr(const RtcpContext& context)
EXCLUSIVE_LOCKS_REQUIRED(critical_section_rtcp_sender_);
private:
@ -316,10 +292,9 @@ private:
std::set<ReportFlag> report_flags_ GUARDED_BY(critical_section_rtcp_sender_);
typedef BuildResult (RTCPSender::*Builder)(RtcpContext*);
std::map<RTCPPacketType, Builder> builders_;
class PacketBuiltCallback;
typedef rtc::scoped_ptr<rtcp::RtcpPacket> (RTCPSender::*BuilderFunc)(
const RtcpContext&);
std::map<RTCPPacketType, BuilderFunc> builders_;
};
} // namespace webrtc