Add support for fragmentation in RtcpPacket.

If the buffer becomes full an OnPacketReady callback will be used to
send the packets created so far. On success the buffer can be reused.
The same callback will be called when the last packet has beed created.

Also made some changes to RawPacket. Buffer will now be heap-allocated
rather than (potentially) stack-allocated, but on the plus side it can
now be allocted with variable size and also avoids one memcpy.

BUG=

patch from issue 56429004 at patchset 160001 (http://crrev.com/56429004#ps160001)

R=asapersson@webrtc.org

Review URL: https://codereview.webrtc.org/1165113002

Cr-Commit-Position: refs/heads/master@{#9390}
This commit is contained in:
Erik Språng
2015-06-08 09:54:14 +02:00
parent 1aff095b6c
commit c1b9d4e686
5 changed files with 862 additions and 479 deletions

View File

@ -10,6 +10,7 @@
#include "webrtc/modules/rtp_rtcp/source/rtcp_packet.h"
#include "webrtc/base/checks.h"
#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
#include "webrtc/system_wrappers/interface/logging.h"
@ -290,10 +291,8 @@ void CreateBye(const RTCPPacketBYE& bye,
size_t* pos) {
CreateHeader(length, PT_BYE, length, buffer, pos);
AssignUWord32(buffer, pos, bye.SenderSSRC);
for (std::vector<uint32_t>::const_iterator it = csrcs.begin();
it != csrcs.end(); ++it) {
AssignUWord32(buffer, pos, *it);
}
for (uint32_t csrc : csrcs)
AssignUWord32(buffer, pos, csrc);
}
// Application-Defined packet (APP) (RFC 3550).
@ -392,6 +391,8 @@ void CreateSli(const RTCPPacketPSFBSLI& sli,
void CreateNack(const RTCPPacketRTPFBNACK& nack,
const std::vector<RTCPPacketRTPFBNACKItem>& nack_fields,
size_t start_index,
size_t end_index,
size_t length,
uint8_t* buffer,
size_t* pos) {
@ -399,10 +400,10 @@ void CreateNack(const RTCPPacketRTPFBNACK& nack,
CreateHeader(kFmt, PT_RTPFB, length, buffer, pos);
AssignUWord32(buffer, pos, nack.SenderSSRC);
AssignUWord32(buffer, pos, nack.MediaSSRC);
for (std::vector<RTCPPacketRTPFBNACKItem>::const_iterator
it = nack_fields.begin(); it != nack_fields.end(); ++it) {
AssignUWord16(buffer, pos, (*it).PacketID);
AssignUWord16(buffer, pos, (*it).BitMask);
for (size_t i = start_index; i < end_index; ++i) {
const RTCPPacketRTPFBNACKItem& nack_item = nack_fields[i];
AssignUWord16(buffer, pos, nack_item.PacketID);
AssignUWord16(buffer, pos, nack_item.BitMask);
}
}
@ -727,105 +728,162 @@ void RtcpPacket::Append(RtcpPacket* packet) {
appended_packets_.push_back(packet);
}
RawPacket RtcpPacket::Build() const {
rtc::scoped_ptr<RawPacket> RtcpPacket::Build() const {
size_t length = 0;
uint8_t packet[IP_PACKET_SIZE];
CreateAndAddAppended(packet, &length, IP_PACKET_SIZE);
return RawPacket(packet, length);
rtc::scoped_ptr<RawPacket> packet(new RawPacket(IP_PACKET_SIZE));
class PacketVerifier : public PacketReadyCallback {
public:
explicit PacketVerifier(RawPacket* packet)
: called_(false), packet_(packet) {}
virtual ~PacketVerifier() {}
void OnPacketReady(uint8_t* data, size_t length) override {
CHECK(!called_) << "Fragmentation not supported.";
called_ = true;
packet_->SetLength(length);
}
private:
bool called_;
RawPacket* const packet_;
} verifier(packet.get());
CreateAndAddAppended(packet->MutableBuffer(), &length, packet->BufferLength(),
&verifier);
OnBufferFull(packet->MutableBuffer(), &length, &verifier);
return packet;
}
void RtcpPacket::Build(uint8_t* packet,
size_t* length,
size_t max_length) const {
*length = 0;
CreateAndAddAppended(packet, length, max_length);
bool RtcpPacket::Build(PacketReadyCallback* callback) const {
uint8_t buffer[IP_PACKET_SIZE];
return BuildExternalBuffer(buffer, IP_PACKET_SIZE, callback);
}
void RtcpPacket::CreateAndAddAppended(uint8_t* packet,
size_t* length,
size_t max_length) const {
Create(packet, length, max_length);
for (std::vector<RtcpPacket*>::const_iterator it = appended_packets_.begin();
it != appended_packets_.end(); ++it) {
(*it)->CreateAndAddAppended(packet, length, max_length);
bool RtcpPacket::BuildExternalBuffer(uint8_t* buffer,
size_t max_length,
PacketReadyCallback* callback) const {
size_t index = 0;
if (!CreateAndAddAppended(buffer, &index, max_length, callback))
return false;
return OnBufferFull(buffer, &index, callback);
}
bool RtcpPacket::CreateAndAddAppended(uint8_t* packet,
size_t* index,
size_t max_length,
PacketReadyCallback* callback) const {
if (!Create(packet, index, max_length, callback))
return false;
for (RtcpPacket* appended : appended_packets_) {
if (!appended->CreateAndAddAppended(packet, index, max_length, callback))
return false;
}
return true;
}
void Empty::Create(uint8_t* packet, size_t* length, size_t max_length) const {
bool RtcpPacket::OnBufferFull(uint8_t* packet,
size_t* index,
RtcpPacket::PacketReadyCallback* callback) const {
if (*index == 0)
return false;
callback->OnPacketReady(packet, *index);
*index = 0;
return true;
}
void SenderReport::Create(uint8_t* packet,
size_t* length,
size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Empty::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
return true;
}
bool SenderReport::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateSenderReport(sr_, BlockToHeaderLength(BlockLength()), packet, length);
CreateReportBlocks(report_blocks_, packet, length);
CreateSenderReport(sr_, BlockToHeaderLength(BlockLength()), packet, index);
CreateReportBlocks(report_blocks_, packet, index);
return true;
}
void SenderReport::WithReportBlock(ReportBlock* block) {
bool SenderReport::WithReportBlock(ReportBlock* block) {
assert(block);
if (report_blocks_.size() >= kMaxNumberOfReportBlocks) {
LOG(LS_WARNING) << "Max report blocks reached.";
return;
return false;
}
report_blocks_.push_back(block->report_block_);
sr_.NumberOfReportBlocks = report_blocks_.size();
return true;
}
void ReceiverReport::Create(uint8_t* packet,
size_t* length,
size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool ReceiverReport::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateReceiverReport(rr_, BlockToHeaderLength(BlockLength()), packet, length);
CreateReportBlocks(report_blocks_, packet, length);
CreateReceiverReport(rr_, BlockToHeaderLength(BlockLength()), packet, index);
CreateReportBlocks(report_blocks_, packet, index);
return true;
}
void ReceiverReport::WithReportBlock(ReportBlock* block) {
bool ReceiverReport::WithReportBlock(ReportBlock* block) {
assert(block);
if (report_blocks_.size() >= kMaxNumberOfReportBlocks) {
LOG(LS_WARNING) << "Max report blocks reached.";
return;
return false;
}
report_blocks_.push_back(block->report_block_);
rr_.NumberOfReportBlocks = report_blocks_.size();
return true;
}
void Ij::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Ij::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateIj(ij_items_, packet, length);
CreateIj(ij_items_, packet, index);
return true;
}
void Ij::WithJitterItem(uint32_t jitter) {
bool Ij::WithJitterItem(uint32_t jitter) {
if (ij_items_.size() >= kMaxNumberOfIjItems) {
LOG(LS_WARNING) << "Max inter-arrival jitter items reached.";
return;
return false;
}
ij_items_.push_back(jitter);
return true;
}
void Sdes::Create(uint8_t* packet, size_t* length, size_t max_length) const {
bool Sdes::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
assert(!chunks_.empty());
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateSdes(chunks_, BlockToHeaderLength(BlockLength()), packet, length);
CreateSdes(chunks_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Sdes::WithCName(uint32_t ssrc, std::string cname) {
bool Sdes::WithCName(uint32_t ssrc, const std::string& cname) {
assert(cname.length() <= 0xff);
if (chunks_.size() >= kMaxNumberOfChunks) {
LOG(LS_WARNING) << "Max SDES chunks reached.";
return;
return false;
}
// In each chunk, the list of items must be terminated by one or more null
// octets. The next chunk must start on a 32-bit boundary.
@ -836,6 +894,7 @@ void Sdes::WithCName(uint32_t ssrc, std::string cname) {
chunk.name = cname;
chunk.null_octets = null_octets;
chunks_.push_back(chunk);
return true;
}
size_t Sdes::BlockLength() const {
@ -843,63 +902,95 @@ size_t Sdes::BlockLength() const {
// Chunk:
// SSRC/CSRC (4 bytes) | CNAME (1 byte) | length (1 byte) | name | padding.
size_t length = kHeaderLength;
for (std::vector<Chunk>::const_iterator it = chunks_.begin();
it != chunks_.end(); ++it) {
length += 6 + (*it).name.length() + (*it).null_octets;
}
for (const Chunk& chunk : chunks_)
length += 6 + chunk.name.length() + chunk.null_octets;
assert(length % 4 == 0);
return length;
}
void Bye::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Bye::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateBye(bye_, csrcs_, BlockToHeaderLength(BlockLength()), packet, length);
CreateBye(bye_, csrcs_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Bye::WithCsrc(uint32_t csrc) {
bool Bye::WithCsrc(uint32_t csrc) {
if (csrcs_.size() >= kMaxNumberOfCsrcs) {
LOG(LS_WARNING) << "Max CSRC size reached.";
return;
return false;
}
csrcs_.push_back(csrc);
return true;
}
void App::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool App::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateApp(app_, ssrc_, BlockToHeaderLength(BlockLength()), packet, length);
CreateApp(app_, ssrc_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Pli::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Pli::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreatePli(pli_, BlockToHeaderLength(BlockLength()), packet, length);
CreatePli(pli_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Sli::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Sli::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateSli(sli_, sli_item_, BlockToHeaderLength(BlockLength()), packet,
length);
CreateSli(sli_, sli_item_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Nack::Create(uint8_t* packet, size_t* length, size_t max_length) const {
bool Nack::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
assert(!nack_fields_.empty());
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
}
CreateNack(nack_, nack_fields_, BlockToHeaderLength(BlockLength()), packet,
length);
// If nack list can't fit in packet, try to fragment.
size_t nack_index = 0;
do {
size_t bytes_left_in_buffer = max_length - *index;
if (bytes_left_in_buffer < kCommonFbFmtLength + 4) {
if (!OnBufferFull(packet, index, callback))
return false;
continue;
}
int64_t num_nack_fields =
std::min((bytes_left_in_buffer - kCommonFbFmtLength) / 4,
nack_fields_.size() - nack_index);
CreateNack(nack_, nack_fields_, nack_index, nack_index + num_nack_fields,
BlockToHeaderLength((num_nack_fields * 4) + kCommonFbFmtLength),
packet, index);
nack_index += num_nack_fields;
} while (nack_index < nack_fields_.size());
return true;
}
void Nack::WithList(const uint16_t* nack_list, int length) {
@ -926,14 +1017,18 @@ void Nack::WithList(const uint16_t* nack_list, int length) {
}
}
void Rpsi::Create(uint8_t* packet, size_t* length, size_t max_length) const {
bool Rpsi::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
assert(rpsi_.NumberOfValidBits > 0);
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateRpsi(rpsi_, padding_bytes_, BlockToHeaderLength(BlockLength()), packet,
length);
index);
return true;
}
void Rpsi::WithPictureId(uint64_t picture_id) {
@ -963,22 +1058,29 @@ void Rpsi::WithPictureId(uint64_t picture_id) {
}
}
void Fir::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Fir::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateFir(fir_, fir_item_, BlockToHeaderLength(BlockLength()), packet,
length);
CreateFir(fir_, fir_item_, BlockToHeaderLength(BlockLength()), packet, index);
return true;
}
void Remb::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Remb::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateRemb(remb_, remb_item_, BlockToHeaderLength(BlockLength()), packet,
length);
index);
return true;
}
void Remb::AppliesTo(uint32_t ssrc) {
@ -989,74 +1091,89 @@ void Remb::AppliesTo(uint32_t ssrc) {
remb_item_.SSRCs[remb_item_.NumberOfSSRCs++] = ssrc;
}
void Tmmbr::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Tmmbr::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateTmmbr(tmmbr_, tmmbr_item_, BlockToHeaderLength(BlockLength()), packet,
length);
index);
return true;
}
void Tmmbn::WithTmmbr(uint32_t ssrc, uint32_t bitrate_kbps, uint16_t overhead) {
bool Tmmbn::WithTmmbr(uint32_t ssrc, uint32_t bitrate_kbps, uint16_t overhead) {
assert(overhead <= 0x1ff);
if (tmmbn_items_.size() >= kMaxNumberOfTmmbrs) {
LOG(LS_WARNING) << "Max TMMBN size reached.";
return;
return false;
}
RTCPPacketRTPFBTMMBRItem tmmbn_item;
tmmbn_item.SSRC = ssrc;
tmmbn_item.MaxTotalMediaBitRate = bitrate_kbps;
tmmbn_item.MeasuredOverhead = overhead;
tmmbn_items_.push_back(tmmbn_item);
return true;
}
void Tmmbn::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Tmmbn::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateTmmbn(tmmbn_, tmmbn_items_, BlockToHeaderLength(BlockLength()), packet,
length);
index);
return true;
}
void Xr::Create(uint8_t* packet, size_t* length, size_t max_length) const {
if (*length + BlockLength() > max_length) {
LOG(LS_WARNING) << "Max packet size reached.";
return;
bool Xr::Create(uint8_t* packet,
size_t* index,
size_t max_length,
RtcpPacket::PacketReadyCallback* callback) const {
while (*index + BlockLength() > max_length) {
if (!OnBufferFull(packet, index, callback))
return false;
}
CreateXrHeader(xr_header_, BlockToHeaderLength(BlockLength()), packet,
length);
CreateRrtr(rrtr_blocks_, packet, length);
CreateDlrr(dlrr_blocks_, packet, length);
CreateVoipMetric(voip_metric_blocks_, packet, length);
CreateXrHeader(xr_header_, BlockToHeaderLength(BlockLength()), packet, index);
CreateRrtr(rrtr_blocks_, packet, index);
CreateDlrr(dlrr_blocks_, packet, index);
CreateVoipMetric(voip_metric_blocks_, packet, index);
return true;
}
void Xr::WithRrtr(Rrtr* rrtr) {
bool Xr::WithRrtr(Rrtr* rrtr) {
assert(rrtr);
if (rrtr_blocks_.size() >= kMaxNumberOfRrtrBlocks) {
LOG(LS_WARNING) << "Max RRTR blocks reached.";
return;
return false;
}
rrtr_blocks_.push_back(rrtr->rrtr_block_);
return true;
}
void Xr::WithDlrr(Dlrr* dlrr) {
bool Xr::WithDlrr(Dlrr* dlrr) {
assert(dlrr);
if (dlrr_blocks_.size() >= kMaxNumberOfDlrrBlocks) {
LOG(LS_WARNING) << "Max DLRR blocks reached.";
return;
return false;
}
dlrr_blocks_.push_back(dlrr->dlrr_block_);
return true;
}
void Xr::WithVoipMetric(VoipMetric* voip_metric) {
bool Xr::WithVoipMetric(VoipMetric* voip_metric) {
assert(voip_metric);
if (voip_metric_blocks_.size() >= kMaxNumberOfVoipMetricBlocks) {
LOG(LS_WARNING) << "Max Voip Metric blocks reached.";
return;
return false;
}
voip_metric_blocks_.push_back(voip_metric->metric_);
return true;
}
size_t Xr::DlrrLength() const {
@ -1072,18 +1189,51 @@ size_t Xr::DlrrLength() const {
return length;
}
void Dlrr::WithDlrrItem(uint32_t ssrc,
bool Dlrr::WithDlrrItem(uint32_t ssrc,
uint32_t last_rr,
uint32_t delay_last_rr) {
if (dlrr_block_.size() >= kMaxNumberOfDlrrItems) {
LOG(LS_WARNING) << "Max DLRR items reached.";
return;
return false;
}
RTCPPacketXRDLRRReportBlockItem dlrr;
dlrr.SSRC = ssrc;
dlrr.LastRR = last_rr;
dlrr.DelayLastRR = delay_last_rr;
dlrr_block_.push_back(dlrr);
return true;
}
RawPacket::RawPacket(size_t buffer_length)
: buffer_length_(buffer_length), length_(0) {
buffer_.reset(new uint8_t[buffer_length]);
}
RawPacket::RawPacket(const uint8_t* packet, size_t packet_length)
: buffer_length_(packet_length), length_(packet_length) {
buffer_.reset(new uint8_t[packet_length]);
memcpy(buffer_.get(), packet, packet_length);
}
const uint8_t* RawPacket::Buffer() const {
return buffer_.get();
}
uint8_t* RawPacket::MutableBuffer() {
return buffer_.get();
}
size_t RawPacket::BufferLength() const {
return buffer_length_;
}
size_t RawPacket::Length() const {
return length_;
}
void RawPacket::SetLength(size_t length) {
assert(length <= buffer_length_);
length_ = length;
}
} // namespace rtcp