Add Slice method to CopyOnWriteBuffer and use it in FEC code.

This avoids unnecessary memcpy calls.

Bug: webrtc:10750
Change-Id: I73fe8f1c9659f2c5e59d7fb97b80349a3504a34a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/145320
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Rasmus Brandt <brandtr@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Ilya Nikolaevskiy <ilnik@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29315}
This commit is contained in:
Ilya Nikolaevskiy
2019-09-25 14:37:10 +02:00
committed by Commit Bot
parent 85d5c197a8
commit 741bab0f6c
5 changed files with 147 additions and 43 deletions

View File

@ -109,10 +109,8 @@ FlexfecReceiver::AddReceivedPacket(const RtpPacketReceived& packet) {
// Insert packet payload into erasure code.
received_packet->pkt = rtc::scoped_refptr<ForwardErrorCorrection::Packet>(
new ForwardErrorCorrection::Packet());
// TODO(ilnik): after slice capability is added to COW, use it here instead
// of initializing COW buffer with ArrayView.
auto payload = packet.payload();
received_packet->pkt->data.SetData(payload.data(), payload.size());
received_packet->pkt->data =
packet.Buffer().Slice(packet.headers_size(), packet.payload_size());
} else {
// This is a media packet, or a FlexFEC packet belonging to some
// other FlexFEC stream.

View File

@ -119,13 +119,14 @@ bool UlpfecReceiverImpl::AddReceivedRedPacket(const RtpPacket& rtp_packet,
packet_counter_.first_packet_time_ms = rtc::TimeMillis();
}
auto red_payload = rtp_packet.payload().subview(kRedHeaderLength);
if (received_packet->is_fec) {
++packet_counter_.num_fec_packets;
// everything behind the RED header
received_packet->pkt->data.SetData(red_payload.data(), red_payload.size());
received_packet->pkt->data =
rtp_packet.Buffer().Slice(rtp_packet.headers_size() + kRedHeaderLength,
rtp_packet.payload_size() - kRedHeaderLength);
} else {
auto red_payload = rtp_packet.payload().subview(kRedHeaderLength);
received_packet->pkt->data.EnsureCapacity(rtp_packet.headers_size() +
red_payload.size());
// Copy RTP header.
@ -170,6 +171,7 @@ int32_t UlpfecReceiverImpl::ProcessReceivedFec() {
crit_sect_.Enter();
// Create a packet with the buffer to modify it.
RtpPacketReceived rtp_packet;
const uint8_t* const original_data = packet->data.cdata();
rtp_packet.Parse(packet->data);
rtp_packet.IdentifyExtensions(extensions_);
// Reset buffer reference, so zeroing would work on a buffer with a
@ -177,6 +179,8 @@ int32_t UlpfecReceiverImpl::ProcessReceivedFec() {
packet->data = rtc::CopyOnWriteBuffer(0);
rtp_packet.ZeroMutableExtensions();
packet->data = rtp_packet.Buffer();
// Ensure that zeroing of extensions was done in place.
RTC_DCHECK_EQ(packet->data.cdata(), original_data);
}
fec_->DecodeFec(*received_packet, &recovered_packets_);
}

View File

@ -14,40 +14,47 @@
namespace rtc {
CopyOnWriteBuffer::CopyOnWriteBuffer() {
CopyOnWriteBuffer::CopyOnWriteBuffer() : offset_(0), size_(0) {
RTC_DCHECK(IsConsistent());
}
CopyOnWriteBuffer::CopyOnWriteBuffer(const CopyOnWriteBuffer& buf)
: buffer_(buf.buffer_) {}
: buffer_(buf.buffer_), offset_(buf.offset_), size_(buf.size_) {}
CopyOnWriteBuffer::CopyOnWriteBuffer(CopyOnWriteBuffer&& buf)
: buffer_(std::move(buf.buffer_)) {}
: buffer_(std::move(buf.buffer_)), offset_(buf.offset_), size_(buf.size_) {
buf.offset_ = 0;
buf.size_ = 0;
RTC_DCHECK(IsConsistent());
}
CopyOnWriteBuffer::CopyOnWriteBuffer(const std::string& s)
: CopyOnWriteBuffer(s.data(), s.length()) {}
CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size)
: buffer_(size > 0 ? new RefCountedObject<Buffer>(size) : nullptr) {
: buffer_(size > 0 ? new RefCountedObject<Buffer>(size) : nullptr),
offset_(0),
size_(size) {
RTC_DCHECK(IsConsistent());
}
CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size, size_t capacity)
: buffer_(size > 0 || capacity > 0
? new RefCountedObject<Buffer>(size, capacity)
: nullptr) {
: nullptr),
offset_(0),
size_(size) {
RTC_DCHECK(IsConsistent());
}
CopyOnWriteBuffer::~CopyOnWriteBuffer() = default;
bool CopyOnWriteBuffer::operator==(const CopyOnWriteBuffer& buf) const {
// Must either use the same buffer internally or have the same contents.
// Must either be the same view of the same buffer or have the same contents.
RTC_DCHECK(IsConsistent());
RTC_DCHECK(buf.IsConsistent());
return buffer_.get() == buf.buffer_.get() ||
(buffer_.get() && buf.buffer_.get() &&
*buffer_.get() == *buf.buffer_.get());
return size_ == buf.size_ &&
(cdata() == buf.cdata() || memcmp(cdata(), buf.cdata(), size_) == 0);
}
void CopyOnWriteBuffer::SetSize(size_t size) {
@ -55,35 +62,39 @@ void CopyOnWriteBuffer::SetSize(size_t size) {
if (!buffer_) {
if (size > 0) {
buffer_ = new RefCountedObject<Buffer>(size);
offset_ = 0;
size_ = size;
}
RTC_DCHECK(IsConsistent());
return;
}
// Clone data if referenced.
if (!buffer_->HasOneRef()) {
buffer_ = new RefCountedObject<Buffer>(buffer_->data(),
std::min(buffer_->size(), size),
std::max(buffer_->capacity(), size));
if (size <= size_) {
size_ = size;
return;
}
buffer_->SetSize(size);
UnshareAndEnsureCapacity(std::max(capacity(), size));
buffer_->SetSize(size + offset_);
size_ = size;
RTC_DCHECK(IsConsistent());
}
void CopyOnWriteBuffer::EnsureCapacity(size_t capacity) {
void CopyOnWriteBuffer::EnsureCapacity(size_t new_capacity) {
RTC_DCHECK(IsConsistent());
if (!buffer_) {
if (capacity > 0) {
buffer_ = new RefCountedObject<Buffer>(0, capacity);
if (new_capacity > 0) {
buffer_ = new RefCountedObject<Buffer>(0, new_capacity);
offset_ = 0;
size_ = 0;
}
RTC_DCHECK(IsConsistent());
return;
} else if (capacity <= buffer_->capacity()) {
} else if (new_capacity <= capacity()) {
return;
}
CloneDataIfReferenced(std::max(buffer_->capacity(), capacity));
buffer_->EnsureCapacity(capacity);
UnshareAndEnsureCapacity(new_capacity);
RTC_DCHECK(IsConsistent());
}
@ -94,18 +105,21 @@ void CopyOnWriteBuffer::Clear() {
if (buffer_->HasOneRef()) {
buffer_->Clear();
} else {
buffer_ = new RefCountedObject<Buffer>(0, buffer_->capacity());
buffer_ = new RefCountedObject<Buffer>(0, capacity());
}
offset_ = 0;
size_ = 0;
RTC_DCHECK(IsConsistent());
}
void CopyOnWriteBuffer::CloneDataIfReferenced(size_t new_capacity) {
if (buffer_->HasOneRef()) {
void CopyOnWriteBuffer::UnshareAndEnsureCapacity(size_t new_capacity) {
if (buffer_->HasOneRef() && new_capacity <= capacity()) {
return;
}
buffer_ = new RefCountedObject<Buffer>(buffer_->data(), buffer_->size(),
buffer_ = new RefCountedObject<Buffer>(buffer_->data() + offset_, size_,
new_capacity);
offset_ = 0;
RTC_DCHECK(IsConsistent());
}

View File

@ -56,6 +56,8 @@ class CopyOnWriteBuffer {
: CopyOnWriteBuffer(size, capacity) {
if (buffer_) {
std::memcpy(buffer_->data(), data, size);
offset_ = 0;
size_ = size;
}
}
@ -88,8 +90,8 @@ class CopyOnWriteBuffer {
if (!buffer_) {
return nullptr;
}
CloneDataIfReferenced(buffer_->capacity());
return buffer_->data<T>();
UnshareAndEnsureCapacity(capacity());
return buffer_->data<T>() + offset_;
}
// Get const pointer to the data. This will not create a copy of the
@ -102,17 +104,17 @@ class CopyOnWriteBuffer {
if (!buffer_) {
return nullptr;
}
return buffer_->data<T>();
return buffer_->data<T>() + offset_;
}
size_t size() const {
RTC_DCHECK(IsConsistent());
return buffer_ ? buffer_->size() : 0;
return size_;
}
size_t capacity() const {
RTC_DCHECK(IsConsistent());
return buffer_ ? buffer_->capacity() : 0;
return buffer_ ? buffer_->capacity() - offset_ : 0;
}
CopyOnWriteBuffer& operator=(const CopyOnWriteBuffer& buf) {
@ -120,6 +122,8 @@ class CopyOnWriteBuffer {
RTC_DCHECK(buf.IsConsistent());
if (&buf != this) {
buffer_ = buf.buffer_;
offset_ = buf.offset_;
size_ = buf.size_;
}
return *this;
}
@ -128,6 +132,10 @@ class CopyOnWriteBuffer {
RTC_DCHECK(IsConsistent());
RTC_DCHECK(buf.IsConsistent());
buffer_ = std::move(buf.buffer_);
offset_ = buf.offset_;
size_ = buf.size_;
buf.offset_ = 0;
buf.size_ = 0;
return *this;
}
@ -157,10 +165,13 @@ class CopyOnWriteBuffer {
if (!buffer_) {
buffer_ = size > 0 ? new RefCountedObject<Buffer>(data, size) : nullptr;
} else if (!buffer_->HasOneRef()) {
buffer_ = new RefCountedObject<Buffer>(data, size, buffer_->capacity());
buffer_ = new RefCountedObject<Buffer>(data, size, capacity());
} else {
buffer_->SetData(data, size);
}
offset_ = 0;
size_ = size;
RTC_DCHECK(IsConsistent());
}
@ -177,6 +188,8 @@ class CopyOnWriteBuffer {
RTC_DCHECK(buf.IsConsistent());
if (&buf != this) {
buffer_ = buf.buffer_;
offset_ = buf.offset_;
size_ = buf.size_;
}
}
@ -188,13 +201,19 @@ class CopyOnWriteBuffer {
RTC_DCHECK(IsConsistent());
if (!buffer_) {
buffer_ = new RefCountedObject<Buffer>(data, size);
offset_ = 0;
size_ = size;
RTC_DCHECK(IsConsistent());
return;
}
CloneDataIfReferenced(
std::max(buffer_->capacity(), buffer_->size() + size));
UnshareAndEnsureCapacity(std::max(capacity(), size_ + size));
buffer_->SetSize(offset_ +
size_); // Remove data to the right of the slice.
buffer_->AppendData(data, size);
size_ += size;
RTC_DCHECK(IsConsistent());
}
@ -228,18 +247,41 @@ class CopyOnWriteBuffer {
// Swaps two buffers.
friend void swap(CopyOnWriteBuffer& a, CopyOnWriteBuffer& b) {
std::swap(a.buffer_, b.buffer_);
std::swap(a.offset_, b.offset_);
std::swap(a.size_, b.size_);
}
CopyOnWriteBuffer Slice(size_t offset, size_t length) const {
CopyOnWriteBuffer slice(*this);
RTC_DCHECK_LE(offset, size_);
RTC_DCHECK_LE(length + offset, size_);
slice.offset_ += offset;
slice.size_ = length;
return slice;
}
private:
// Create a copy of the underlying data if it is referenced from other Buffer
// objects.
void CloneDataIfReferenced(size_t new_capacity);
// objects or there is not enough capacity.
void UnshareAndEnsureCapacity(size_t new_capacity);
// Pre- and postcondition of all methods.
bool IsConsistent() const { return (!buffer_ || buffer_->capacity() > 0); }
bool IsConsistent() const {
if (buffer_) {
return buffer_->capacity() > 0 && offset_ <= buffer_->size() &&
offset_ + size_ <= buffer_->size();
} else {
return size_ == 0 && offset_ == 0;
}
}
// buffer_ is either null, or points to an rtc::Buffer with capacity > 0.
scoped_refptr<RefCountedObject<Buffer>> buffer_;
// This buffer may represent a slice of a original data.
size_t offset_; // Offset of a current slice in the original data in buffer_.
// Should be 0 if the buffer_ is empty.
size_t size_; // Size of a current slice in the original data in buffer_.
// Should be 0 if the buffer_ is empty.
};
} // namespace rtc

View File

@ -319,4 +319,50 @@ TEST(CopyOnWriteBufferTest, TestBacketWrite) {
EXPECT_EQ(0, memcmp(buf2.cdata(), kTestData, 3));
}
TEST(CopyOnWriteBufferTest, CreateSlice) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 4);
EXPECT_EQ(slice.size(), 4u);
EXPECT_EQ(0, memcmp(buf.cdata() + 3, slice.cdata(), 4));
}
TEST(CopyOnWriteBufferTest, NoCopyDataOnSlice) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 4);
EXPECT_EQ(buf.cdata() + 3, slice.cdata());
}
TEST(CopyOnWriteBufferTest, WritingCopiesData) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 4);
slice[0] = 0xaa;
EXPECT_NE(buf.cdata() + 3, slice.cdata());
EXPECT_EQ(0, memcmp(buf.cdata(), kTestData, 10));
}
TEST(CopyOnWriteBufferTest, WritingToBufferDoesntAffectsSlice) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 4);
buf[0] = 0xaa;
EXPECT_NE(buf.cdata() + 3, slice.cdata());
EXPECT_EQ(0, memcmp(slice.cdata(), kTestData + 3, 4));
}
TEST(CopyOnWriteBufferTest, SliceOfASlice) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 7);
CopyOnWriteBuffer slice2 = slice.Slice(2, 3);
EXPECT_EQ(slice2.size(), 3u);
EXPECT_EQ(slice.cdata() + 2, slice2.cdata());
EXPECT_EQ(buf.cdata() + 5, slice2.cdata());
}
TEST(CopyOnWriteBufferTest, SlicesAreIndependent) {
CopyOnWriteBuffer buf(kTestData, 10, 10);
CopyOnWriteBuffer slice = buf.Slice(3, 7);
CopyOnWriteBuffer slice2 = buf.Slice(3, 7);
slice2[0] = 0xaa;
EXPECT_EQ(buf.cdata() + 3, slice.cdata());
}
} // namespace rtc