Change SetLocalContent in channel classes to avoid Invoke.

With these changes, we now often have 0 invokes and at most 1 when
calling SetLocalContent on a channel. Before we had at least 1 and
typically 2.

Summary of changes.
* Updating RtpExtension::DeduplicateHeaderExtensions to return a sorted
  vector (+test) for easy detection of changes.
* Before updating the transport on the network thread, detect if
  actual changes to the demuxer criteria or changes to the rtp header
  extensions have been made.
* Consolidate both transport updates to a single call instead of two.
* Added DCHECK guards to catch regressions in number of invokes.

A possible upcoming improvement is to update the transport
asynchronously.

Bug: webrtc:13536
Change-Id: I71ef7b181635a796ffa1e3a02a0f661d28a4870c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/244700
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35638}
This commit is contained in:
Tomas Gunnarsson
2022-01-06 12:36:04 +00:00
committed by WebRTC LUCI CQ
parent ac72dda645
commit c69453d93b
5 changed files with 142 additions and 50 deletions

View File

@ -11,6 +11,7 @@
#include <algorithm>
#include <string>
#include <tuple>
#include <utility>
#include "api/array_view.h"
@ -280,6 +281,14 @@ const std::vector<RtpExtension> RtpExtension::DeduplicateHeaderExtensions(
}
}
// Sort the returned vector to make comparisons of header extensions reliable.
// In order of priority, we sort by uri first, then encrypt and id last.
std::sort(filtered.begin(), filtered.end(),
[](const RtpExtension& a, const RtpExtension& b) {
return std::tie(a.uri, a.encrypt, a.id) <
std::tie(b.uri, b.encrypt, b.id);
});
return filtered;
}
} // namespace webrtc

View File

@ -286,6 +286,9 @@ struct RTC_EXPORT RtpExtension {
bool encrypt);
// Returns a list of extensions where any extension URI is unique.
// The returned list will be sorted by uri first, then encrypt and id last.
// Having the list sorted allows the caller fo compare filtered lists for
// equality to detect when changes have been made.
static const std::vector<RtpExtension> DeduplicateHeaderExtensions(
const std::vector<RtpExtension>& extensions,
Filter filter);

View File

@ -109,6 +109,38 @@ TEST(RtpExtensionTest, DeduplicateHeaderExtensions) {
EXPECT_EQ((std::vector<RtpExtension>{kExtension1Encrypted}), filtered);
}
// Test that the filtered vector is sorted so that for a given unsorted array of
// extensions, the filtered vector will always be laied out the same (for easy
// comparison).
TEST(RtpExtensionTest, DeduplicateHeaderExtensionsSorted) {
const std::vector<RtpExtension> extensions = {
RtpExtension("cde1", 11, false), RtpExtension("cde2", 12, true),
RtpExtension("abc1", 3, false), RtpExtension("abc2", 4, true),
RtpExtension("cde3", 9, true), RtpExtension("cde4", 10, false),
RtpExtension("abc3", 1, true), RtpExtension("abc4", 2, false),
RtpExtension("bcd3", 7, false), RtpExtension("bcd1", 8, true),
RtpExtension("bcd2", 5, true), RtpExtension("bcd4", 6, false),
};
auto encrypted = RtpExtension::DeduplicateHeaderExtensions(
extensions, RtpExtension::Filter::kRequireEncryptedExtension);
const std::vector<RtpExtension> expected_sorted_encrypted = {
RtpExtension("abc2", 4, true), RtpExtension("abc3", 1, true),
RtpExtension("bcd1", 8, true), RtpExtension("bcd2", 5, true),
RtpExtension("cde2", 12, true), RtpExtension("cde3", 9, true)};
EXPECT_EQ(expected_sorted_encrypted, encrypted);
auto unencypted = RtpExtension::DeduplicateHeaderExtensions(
extensions, RtpExtension::Filter::kDiscardEncryptedExtension);
const std::vector<RtpExtension> expected_sorted_unencrypted = {
RtpExtension("abc1", 3, false), RtpExtension("abc4", 2, false),
RtpExtension("bcd3", 7, false), RtpExtension("bcd4", 6, false),
RtpExtension("cde1", 11, false), RtpExtension("cde4", 10, false)};
EXPECT_EQ(expected_sorted_unencrypted, unencypted);
}
TEST(RtpExtensionTest, FindHeaderExtensionByUriAndEncryption) {
std::vector<RtpExtension> extensions;

View File

@ -220,6 +220,11 @@ bool BaseChannel::SetRtpTransport(webrtc::RtpTransportInternal* rtp_transport) {
if (rtp_transport_) {
DisconnectFromRtpTransport_n();
// Clear the cached header extensions on the worker.
worker_thread_->PostTask(ToQueuedTask(alive_, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
rtp_header_extensions_.clear();
}));
}
rtp_transport_ = rtp_transport;
@ -457,18 +462,38 @@ void BaseChannel::OnRtpPacket(const webrtc::RtpPacketReceived& parsed_packet) {
packet_time.IsMinusInfinity() ? -1 : packet_time.us());
}
void BaseChannel::UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions) {
// Update the header extension map on network thread in case there is data
// race.
//
// NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
// extension maps are not merged when BUNDLE is enabled. This is fine because
// the ID for MID should be consistent among all the RTP transports.
network_thread_->Invoke<void>(RTC_FROM_HERE, [this, &header_extensions] {
void BaseChannel::MaybeUpdateDemuxerAndRtpExtensions_w(
bool update_demuxer,
absl::optional<RtpHeaderExtensions> extensions) {
if (extensions) {
if (rtp_header_extensions_ == extensions) {
extensions.reset(); // No need to update header extensions.
} else {
rtp_header_extensions_ = *extensions;
}
}
if (!update_demuxer && !extensions)
return;
// TODO(bugs.webrtc.org/13536): See if we can do this asynchronously.
if (update_demuxer)
media_channel()->OnDemuxerCriteriaUpdatePending();
network_thread()->Invoke<void>(RTC_FROM_HERE, [&]() mutable {
RTC_DCHECK_RUN_ON(network_thread());
rtp_transport_->UpdateRtpHeaderExtensionMap(header_extensions);
// NOTE: This doesn't take the BUNDLE case in account meaning the RTP header
// extension maps are not merged when BUNDLE is enabled. This is fine
// because the ID for MID should be consistent among all the RTP transports.
if (extensions)
rtp_transport_->UpdateRtpHeaderExtensionMap(*extensions);
if (update_demuxer)
rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria_, this);
});
if (update_demuxer)
media_channel()->OnDemuxerCriteriaUpdateComplete();
}
bool BaseChannel::RegisterRtpDemuxerSink_w() {
@ -747,14 +772,17 @@ RtpHeaderExtensions BaseChannel::GetDeduplicatedRtpHeaderExtensions(
extensions_filter_);
}
void BaseChannel::MaybeAddHandledPayloadType(int payload_type) {
bool BaseChannel::MaybeAddHandledPayloadType(int payload_type) {
bool demuxer_criteria_modified = false;
if (payload_type_demuxing_enabled_) {
demuxer_criteria_.payload_types().insert(
static_cast<uint8_t>(payload_type));
demuxer_criteria_modified = demuxer_criteria_.payload_types()
.insert(static_cast<uint8_t>(payload_type))
.second;
}
// Even if payload type demuxing is currently disabled, we need to remember
// the payload types in case it's re-enabled later.
payload_types_.insert(static_cast<uint8_t>(payload_type));
return demuxer_criteria_modified;
}
bool BaseChannel::ClearHandledPayloadTypes() {
@ -813,18 +841,18 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
SdpType type,
std::string& error_desc) {
TRACE_EVENT0("webrtc", "VoiceChannel::SetLocalContent_w");
RTC_LOG(LS_INFO) << "Setting local voice description for " << ToString();
RTC_DLOG(LS_INFO) << "Setting local voice description for " << ToString();
RtpHeaderExtensions rtp_header_extensions =
RTC_LOG_THREAD_BLOCK_COUNT();
RtpHeaderExtensions header_extensions =
GetDeduplicatedRtpHeaderExtensions(content->rtp_header_extensions());
// TODO(tommi): There's a hop to the network thread here.
// some of the below is also network thread related.
UpdateRtpHeaderExtensionMap(rtp_header_extensions);
bool update_header_extensions = true;
media_channel()->SetExtmapAllowMixed(content->extmap_allow_mixed());
AudioRecvParameters recv_params = last_recv_params_;
RtpParametersFromMediaDescription(
content->as_audio(), rtp_header_extensions,
content->as_audio(), header_extensions,
webrtc::RtpTransceiverDirectionHasRecv(content->direction()),
&recv_params);
@ -836,24 +864,17 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
bool criteria_modified = false;
if (webrtc::RtpTransceiverDirectionHasRecv(content->direction())) {
for (const AudioCodec& codec : content->as_audio()->codecs()) {
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
if (!RegisterRtpDemuxerSink_w()) {
error_desc = StringFormat("Failed to set up audio demuxing for mid='%s'.",
content_name().c_str());
return false;
if (MaybeAddHandledPayloadType(codec.id)) {
criteria_modified = true;
}
}
}
last_recv_params_ = recv_params;
// TODO(pthatcher): Move local streams into AudioSendParameters, and
// only give it to the media channel once we have a remote
// description too (without a remote description, we won't be able
// to send them anyway).
if (!UpdateLocalStreams_w(content->as_audio()->streams(), type, error_desc)) {
RTC_DCHECK(!error_desc.empty());
return false;
@ -861,6 +882,17 @@ bool VoiceChannel::SetLocalContent_w(const MediaContentDescription* content,
set_local_content_direction(content->direction());
UpdateMediaSendRecvState_w();
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0);
MaybeUpdateDemuxerAndRtpExtensions_w(
criteria_modified,
update_header_extensions
? absl::optional<RtpHeaderExtensions>(std::move(header_extensions))
: absl::nullopt);
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1);
return true;
}
@ -931,17 +963,19 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
SdpType type,
std::string& error_desc) {
TRACE_EVENT0("webrtc", "VideoChannel::SetLocalContent_w");
RTC_LOG(LS_INFO) << "Setting local video description for " << ToString();
RTC_DLOG(LS_INFO) << "Setting local video description for " << ToString();
RtpHeaderExtensions rtp_header_extensions =
RTC_LOG_THREAD_BLOCK_COUNT();
RtpHeaderExtensions header_extensions =
GetDeduplicatedRtpHeaderExtensions(content->rtp_header_extensions());
UpdateRtpHeaderExtensionMap(rtp_header_extensions);
bool update_header_extensions = true;
media_channel()->SetExtmapAllowMixed(content->extmap_allow_mixed());
VideoRecvParameters recv_params = last_recv_params_;
RtpParametersFromMediaDescription(
content->as_video(), rtp_header_extensions,
content->as_video(), header_extensions,
webrtc::RtpTransceiverDirectionHasRecv(content->direction()),
&recv_params);
@ -974,15 +1008,11 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
return false;
}
bool criteria_modified = false;
if (webrtc::RtpTransceiverDirectionHasRecv(content->direction())) {
for (const VideoCodec& codec : content->as_video()->codecs()) {
MaybeAddHandledPayloadType(codec.id);
}
// Need to re-register the sink to update the handled payload.
if (!RegisterRtpDemuxerSink_w()) {
error_desc = StringFormat("Failed to set up video demuxing for mid='%s'.",
content_name().c_str());
return false;
if (MaybeAddHandledPayloadType(codec.id))
criteria_modified = true;
}
}
@ -998,10 +1028,6 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
last_send_params_ = send_params;
}
// TODO(pthatcher): Move local streams into VideoSendParameters, and
// only give it to the media channel once we have a remote
// description too (without a remote description, we won't be able
// to send them anyway).
if (!UpdateLocalStreams_w(content->as_video()->streams(), type, error_desc)) {
RTC_DCHECK(!error_desc.empty());
return false;
@ -1009,6 +1035,17 @@ bool VideoChannel::SetLocalContent_w(const MediaContentDescription* content,
set_local_content_direction(content->direction());
UpdateMediaSendRecvState_w();
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(0);
MaybeUpdateDemuxerAndRtpExtensions_w(
criteria_modified,
update_header_extensions
? absl::optional<RtpHeaderExtensions>(std::move(header_extensions))
: absl::nullopt);
RTC_DCHECK_BLOCK_COUNT_NO_MORE_THAN(1);
return true;
}

View File

@ -289,14 +289,23 @@ class BaseChannel : public ChannelInterface,
// Add `payload_type` to `demuxer_criteria_` if payload type demuxing is
// enabled.
void MaybeAddHandledPayloadType(int payload_type) RTC_RUN_ON(worker_thread());
// Returns true if the demuxer payload type changed and a re-registration
// is needed.
bool MaybeAddHandledPayloadType(int payload_type) RTC_RUN_ON(worker_thread());
// Returns true iff the demuxer payload type criteria was non-empty before
// Returns true if the demuxer payload type criteria was non-empty before
// clearing.
bool ClearHandledPayloadTypes() RTC_RUN_ON(worker_thread());
void UpdateRtpHeaderExtensionMap(
const RtpHeaderExtensions& header_extensions);
// Hops to the network thread to update the transport if an update is
// requested. If `update_demuxer` is false and `extensions` is not set, the
// function simply returns. If either of these is set, the function updates
// the transport with either or both of the demuxer criteria and the supplied
// rtp header extensions.
void MaybeUpdateDemuxerAndRtpExtensions_w(
bool update_demuxer,
absl::optional<RtpHeaderExtensions> extensions)
RTC_RUN_ON(worker_thread());
bool RegisterRtpDemuxerSink_w() RTC_RUN_ON(worker_thread());
@ -349,7 +358,9 @@ class BaseChannel : public ChannelInterface,
worker_thread()) = webrtc::RtpTransceiverDirection::kInactive;
// Cached list of payload types, used if payload type demuxing is re-enabled.
std::set<uint8_t> payload_types_ RTC_GUARDED_BY(worker_thread());
webrtc::flat_set<uint8_t> payload_types_ RTC_GUARDED_BY(worker_thread());
// A stored copy of the rtp header extensions as applied to the transport.
RtpHeaderExtensions rtp_header_extensions_ RTC_GUARDED_BY(worker_thread());
// TODO(bugs.webrtc.org/12239): Modified on worker thread, accessed
// on network thread in RegisterRtpDemuxerSink_n (called from Init_w)
webrtc::RtpDemuxerCriteria demuxer_criteria_;