Moves TransportFeedbackAdapter to TaskQueue.

Bug: webrtc:9883
Change-Id: Id87e281751d98043f4470df5a71d458f4cd654c1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158793
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30037}
This commit is contained in:
Sebastian Jansson
2019-12-06 17:35:02 +01:00
committed by Commit Bot
parent 62ea0aaea0
commit 62d01cde6f
6 changed files with 122 additions and 123 deletions

View File

@ -22,6 +22,7 @@
#include "call/rtp_video_sender.h" #include "call/rtp_video_sender.h"
#include "logging/rtc_event_log/events/rtc_event_remote_estimate.h" #include "logging/rtc_event_log/events/rtc_event_remote_estimate.h"
#include "logging/rtc_event_log/events/rtc_event_route_change.h" #include "logging/rtc_event_log/events/rtc_event_route_change.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/rate_limiter.h" #include "rtc_base/rate_limiter.h"
@ -228,6 +229,7 @@ void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
} }
StreamFeedbackProvider* StreamFeedbackProvider*
RtpTransportControllerSend::GetStreamFeedbackProvider() { RtpTransportControllerSend::GetStreamFeedbackProvider() {
RTC_DCHECK_RUN_ON(&task_queue_);
return &transport_feedback_adapter_; return &transport_feedback_adapter_;
} }
@ -278,11 +280,6 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
<< " bps."; << " bps.";
RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0); RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
if (reset_feedback_on_route_change_)
transport_feedback_adapter_.SetNetworkIds(
network_route.local_network_id, network_route.remote_network_id);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
if (event_log_) { if (event_log_) {
event_log_->Log(std::make_unique<RtcEventRouteChange>( event_log_->Log(std::make_unique<RtcEventRouteChange>(
network_route.connected, network_route.packet_overhead)); network_route.connected, network_route.packet_overhead));
@ -290,8 +287,13 @@ void RtpTransportControllerSend::OnNetworkRouteChanged(
NetworkRouteChange msg; NetworkRouteChange msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds()); msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.constraints = ConvertConstraints(bitrate_config, clock_); msg.constraints = ConvertConstraints(bitrate_config, clock_);
task_queue_.PostTask([this, msg] { task_queue_.PostTask([this, msg, network_route] {
RTC_DCHECK_RUN_ON(&task_queue_); RTC_DCHECK_RUN_ON(&task_queue_);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
if (reset_feedback_on_route_change_) {
transport_feedback_adapter_.SetNetworkIds(
network_route.local_network_id, network_route.remote_network_id);
}
if (controller_) { if (controller_) {
PostUpdates(controller_->OnNetworkRouteChange(msg)); PostUpdates(controller_->OnNetworkRouteChange(msg));
} else { } else {
@ -351,17 +353,15 @@ void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
} }
void RtpTransportControllerSend::OnSentPacket( void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) { const rtc::SentPacket& sent_packet) {
absl::optional<SentPacket> packet_msg = task_queue_.PostTask([this, sent_packet]() {
transport_feedback_adapter_.ProcessSentPacket(sent_packet); RTC_DCHECK_RUN_ON(&task_queue_);
if (packet_msg) { absl::optional<SentPacket> packet_msg =
task_queue_.PostTask([this, packet_msg]() { transport_feedback_adapter_.ProcessSentPacket(sent_packet);
RTC_DCHECK_RUN_ON(&task_queue_); pacer()->UpdateOutstandingData(
if (controller_) transport_feedback_adapter_.GetOutstandingData());
PostUpdates(controller_->OnSentPacket(*packet_msg)); if (packet_msg && controller_)
}); PostUpdates(controller_->OnSentPacket(*packet_msg));
} });
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
} }
void RtpTransportControllerSend::OnReceivedPacket( void RtpTransportControllerSend::OnReceivedPacket(
@ -468,29 +468,30 @@ void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
void RtpTransportControllerSend::OnAddPacket( void RtpTransportControllerSend::OnAddPacket(
const RtpPacketSendInfo& packet_info) { const RtpPacketSendInfo& packet_info) {
transport_feedback_adapter_.AddPacket( auto creation_time = Timestamp::ms(clock_->TimeInMilliseconds());
packet_info, task_queue_.PostTask([this, packet_info, creation_time]() {
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load() RTC_DCHECK_RUN_ON(&task_queue_);
: 0, transport_feedback_adapter_.AddPacket(
Timestamp::ms(clock_->TimeInMilliseconds())); packet_info,
send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_ : 0,
creation_time);
});
} }
void RtpTransportControllerSend::OnTransportFeedback( void RtpTransportControllerSend::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) { const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_); auto feedback_time = Timestamp::ms(clock_->TimeInMilliseconds());
task_queue_.PostTask([this, feedback, feedback_time]() {
absl::optional<TransportPacketsFeedback> feedback_msg = RTC_DCHECK_RUN_ON(&task_queue_);
transport_feedback_adapter_.ProcessTransportFeedback( absl::optional<TransportPacketsFeedback> feedback_msg =
feedback, Timestamp::ms(clock_->TimeInMilliseconds())); transport_feedback_adapter_.ProcessTransportFeedback(feedback,
if (feedback_msg) { feedback_time);
task_queue_.PostTask([this, feedback_msg]() { if (feedback_msg && controller_) {
RTC_DCHECK_RUN_ON(&task_queue_); PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));
if (controller_) }
PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg)); pacer()->UpdateOutstandingData(
}); transport_feedback_adapter_.GetOutstandingData());
} });
pacer()->UpdateOutstandingData(
transport_feedback_adapter_.GetOutstandingData());
} }
void RtpTransportControllerSend::OnRemoteNetworkEstimate( void RtpTransportControllerSend::OnRemoteNetworkEstimate(

View File

@ -151,7 +151,8 @@ class RtpTransportControllerSend final
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_); TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
// TODO(srte): Move all access to feedback adapter to task queue. // TODO(srte): Move all access to feedback adapter to task queue.
TransportFeedbackAdapter transport_feedback_adapter_; TransportFeedbackAdapter transport_feedback_adapter_
RTC_GUARDED_BY(task_queue_);
NetworkControllerFactoryInterface* const controller_factory_override_ NetworkControllerFactoryInterface* const controller_factory_override_
RTC_PT_GUARDED_BY(task_queue_); RTC_PT_GUARDED_BY(task_queue_);
@ -176,16 +177,13 @@ class RtpTransportControllerSend final
const bool reset_feedback_on_route_change_; const bool reset_feedback_on_route_change_;
const bool send_side_bwe_with_overhead_; const bool send_side_bwe_with_overhead_;
const bool add_pacing_to_cwin_; const bool add_pacing_to_cwin_;
// Transport overhead is written by OnNetworkRouteChanged and read by
// AddPacket. size_t transport_overhead_bytes_per_packet_ RTC_GUARDED_BY(task_queue_);
// TODO(srte): Remove atomic when feedback adapter runs on task queue.
std::atomic<size_t> transport_overhead_bytes_per_packet_;
bool network_available_ RTC_GUARDED_BY(task_queue_); bool network_available_ RTC_GUARDED_BY(task_queue_);
RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_);
RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_); RepeatingTaskHandle controller_task_ RTC_GUARDED_BY(task_queue_);
// TODO(srte): Remove this checker when feedback adapter runs on task queue.
rtc::RaceChecker worker_race_;
// Protected by internal locks.
RateLimiter retransmission_rate_limiter_; RateLimiter retransmission_rate_limiter_;
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|. // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.

View File

@ -135,17 +135,24 @@ class RtpVideoSenderTestFixture {
VideoEncoderConfig::ContentType::kRealtimeVideo), VideoEncoderConfig::ContentType::kRealtimeVideo),
retransmission_rate_limiter_(time_controller_.GetClock(), retransmission_rate_limiter_(time_controller_.GetClock(),
kRetransmitWindowSizeMs) { kRetransmitWindowSizeMs) {
std::map<uint32_t, RtpState> suspended_ssrcs; rtc::Event done;
router_ = std::make_unique<RtpVideoSender>( transport_controller_.GetWorkerQueue()->PostTask([&]() {
time_controller_.GetClock(), suspended_ssrcs, suspended_payload_states, std::map<uint32_t, RtpState> suspended_ssrcs;
config_.rtp, config_.rtcp_report_interval_ms, &transport_,
CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_, router_ = std::make_unique<RtpVideoSender>(
&stats_proxy_, &stats_proxy_, &stats_proxy_, time_controller_.GetClock(), suspended_ssrcs,
frame_count_observer, &stats_proxy_, &stats_proxy_, suspended_payload_states, config_.rtp,
&send_delay_stats_), config_.rtcp_report_interval_ms, &transport_,
&transport_controller_, &event_log_, &retransmission_rate_limiter_, CreateObservers(&call_stats_, &encoder_feedback_, &stats_proxy_,
std::make_unique<FecControllerDefault>(time_controller_.GetClock()), &stats_proxy_, &stats_proxy_, &stats_proxy_,
nullptr, CryptoOptions{}); frame_count_observer, &stats_proxy_, &stats_proxy_,
&send_delay_stats_),
&transport_controller_, &event_log_, &retransmission_rate_limiter_,
std::make_unique<FecControllerDefault>(time_controller_.GetClock()),
nullptr, CryptoOptions{});
done.Set();
});
done.Wait(rtc::Event::kForever);
} }
RtpVideoSenderTestFixture( RtpVideoSenderTestFixture(
const std::vector<uint32_t>& ssrcs, const std::vector<uint32_t>& ssrcs,
@ -157,7 +164,14 @@ class RtpVideoSenderTestFixture {
payload_type, payload_type,
suspended_payload_states, suspended_payload_states,
/*frame_count_observer=*/nullptr) {} /*frame_count_observer=*/nullptr) {}
~RtpVideoSenderTestFixture() {
rtc::Event done;
transport_controller_.GetWorkerQueue()->PostTask([&]() {
router_.reset();
done.Set();
});
done.Wait(rtc::Event::kForever);
}
RtpVideoSender* router() { return router_.get(); } RtpVideoSender* router() { return router_.get(); }
MockTransport& transport() { return transport_; } MockTransport& transport() { return transport_; }
void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); } void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }

View File

@ -155,8 +155,8 @@ TEST_F(BbrNetworkControllerTest, UpdatesTargetSendRate) {
ret_net->UpdateConfig( ret_net->UpdateConfig(
[](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); }); [](NetworkSimulationConfig* c) { c->delay = TimeDelta::ms(200); });
s.RunFor(TimeDelta::seconds(40)); s.RunFor(TimeDelta::seconds(35));
EXPECT_NEAR(client->send_bandwidth().kbps(), 200, 40); EXPECT_NEAR(client->send_bandwidth().kbps(), 180, 50);
} }
} // namespace test } // namespace test

View File

@ -73,7 +73,6 @@ TransportFeedbackAdapter::~TransportFeedbackAdapter() {
void TransportFeedbackAdapter::RegisterStreamFeedbackObserver( void TransportFeedbackAdapter::RegisterStreamFeedbackObserver(
std::vector<uint32_t> ssrcs, std::vector<uint32_t> ssrcs,
StreamFeedbackObserver* observer) { StreamFeedbackObserver* observer) {
rtc::CritScope cs(&observers_lock_);
RTC_DCHECK(observer); RTC_DCHECK(observer);
RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) { RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
return pair.second == observer; return pair.second == observer;
@ -83,7 +82,6 @@ void TransportFeedbackAdapter::RegisterStreamFeedbackObserver(
void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver( void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver(
StreamFeedbackObserver* observer) { StreamFeedbackObserver* observer) {
rtc::CritScope cs(&observers_lock_);
RTC_DCHECK(observer); RTC_DCHECK(observer);
const auto it = absl::c_find_if( const auto it = absl::c_find_if(
observers_, [=](const auto& pair) { return pair.second == observer; }); observers_, [=](const auto& pair) { return pair.second == observer; });
@ -94,35 +92,31 @@ void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver(
void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info, void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
size_t overhead_bytes, size_t overhead_bytes,
Timestamp creation_time) { Timestamp creation_time) {
{ PacketFeedback packet;
rtc::CritScope cs(&lock_); packet.creation_time = creation_time;
PacketFeedback packet; packet.sent.sequence_number =
packet.creation_time = creation_time; seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number);
packet.sent.sequence_number = packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes);
seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number); packet.local_net_id = local_net_id_;
packet.sent.size = DataSize::bytes(packet_info.length + overhead_bytes); packet.remote_net_id = remote_net_id_;
packet.local_net_id = local_net_id_; packet.sent.pacing_info = packet_info.pacing_info;
packet.remote_net_id = remote_net_id_; if (packet_info.has_rtp_sequence_number) {
packet.sent.pacing_info = packet_info.pacing_info; packet.ssrc = packet_info.ssrc;
if (packet_info.has_rtp_sequence_number) { packet.rtp_sequence_number = packet_info.rtp_sequence_number;
packet.ssrc = packet_info.ssrc;
packet.rtp_sequence_number = packet_info.rtp_sequence_number;
}
while (!history_.empty() &&
creation_time - history_.begin()->second.creation_time >
kSendTimeHistoryWindow) {
// TODO(sprang): Warn if erasing (too many) old items?
if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
history_.insert(std::make_pair(packet.sent.sequence_number, packet));
} }
while (!history_.empty() &&
creation_time - history_.begin()->second.creation_time >
kSendTimeHistoryWindow) {
// TODO(sprang): Warn if erasing (too many) old items?
if (history_.begin()->second.sent.sequence_number > last_ack_seq_num_)
in_flight_.RemoveInFlightPacketBytes(history_.begin()->second);
history_.erase(history_.begin());
}
history_.insert(std::make_pair(packet.sent.sequence_number, packet));
} }
absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket( absl::optional<SentPacket> TransportFeedbackAdapter::ProcessSentPacket(
const rtc::SentPacket& sent_packet) { const rtc::SentPacket& sent_packet) {
rtc::CritScope cs(&lock_);
auto send_time = Timestamp::ms(sent_packet.send_time_ms); auto send_time = Timestamp::ms(sent_packet.send_time_ms);
// TODO(srte): Only use one way to indicate that packet feedback is used. // TODO(srte): Only use one way to indicate that packet feedback is used.
if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) { if (sent_packet.info.included_in_feedback || sent_packet.packet_id != -1) {
@ -171,41 +165,37 @@ TransportFeedbackAdapter::ProcessTransportFeedback(
std::vector<PacketFeedback> feedback_vector; std::vector<PacketFeedback> feedback_vector;
TransportPacketsFeedback msg; TransportPacketsFeedback msg;
msg.feedback_time = feedback_receive_time; msg.feedback_time = feedback_receive_time;
{ msg.prior_in_flight =
rtc::CritScope cs(&lock_); in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
msg.prior_in_flight = feedback_vector =
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); ProcessTransportFeedbackInner(feedback, feedback_receive_time);
feedback_vector = if (feedback_vector.empty())
ProcessTransportFeedbackInner(feedback, feedback_receive_time); return absl::nullopt;
if (feedback_vector.empty())
return absl::nullopt;
for (const PacketFeedback& fb : feedback_vector) { for (const PacketFeedback& fb : feedback_vector) {
PacketResult res; PacketResult res;
res.sent_packet = fb.sent; res.sent_packet = fb.sent;
res.receive_time = fb.receive_time; res.receive_time = fb.receive_time;
msg.packet_feedbacks.push_back(res); msg.packet_feedbacks.push_back(res);
}
auto it = history_.find(last_ack_seq_num_);
if (it != history_.end()) {
msg.first_unacked_send_time = it->second.sent.send_time;
}
msg.data_in_flight =
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
} }
auto it = history_.find(last_ack_seq_num_);
if (it != history_.end()) {
msg.first_unacked_send_time = it->second.sent.send_time;
}
msg.data_in_flight =
in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
SignalObservers(feedback_vector); SignalObservers(feedback_vector);
return msg; return msg;
} }
void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id, void TransportFeedbackAdapter::SetNetworkIds(uint16_t local_id,
uint16_t remote_id) { uint16_t remote_id) {
rtc::CritScope cs(&lock_);
local_net_id_ = local_id; local_net_id_ = local_id;
remote_net_id_ = remote_id; remote_net_id_ = remote_id;
} }
DataSize TransportFeedbackAdapter::GetOutstandingData() const { DataSize TransportFeedbackAdapter::GetOutstandingData() const {
rtc::CritScope cs(&lock_);
return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_); return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
} }
@ -290,7 +280,6 @@ TransportFeedbackAdapter::ProcessTransportFeedbackInner(
void TransportFeedbackAdapter::SignalObservers( void TransportFeedbackAdapter::SignalObservers(
const std::vector<PacketFeedback>& feedback_vector) { const std::vector<PacketFeedback>& feedback_vector) {
rtc::CritScope cs(&observers_lock_);
for (auto& observer : observers_) { for (auto& observer : observers_) {
std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback; std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
for (const auto& packet : feedback_vector) { for (const auto& packet : feedback_vector) {

View File

@ -87,36 +87,33 @@ class TransportFeedbackAdapter : public StreamFeedbackProvider {
std::vector<PacketFeedback> ProcessTransportFeedbackInner( std::vector<PacketFeedback> ProcessTransportFeedbackInner(
const rtcp::TransportFeedback& feedback, const rtcp::TransportFeedback& feedback,
Timestamp feedback_time) RTC_RUN_ON(&lock_); Timestamp feedback_time);
void SignalObservers( void SignalObservers(
const std::vector<PacketFeedback>& packet_feedback_vector); const std::vector<PacketFeedback>& packet_feedback_vector);
rtc::CriticalSection lock_; DataSize pending_untracked_size_ = DataSize::Zero();
DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero(); Timestamp last_send_time_ = Timestamp::MinusInfinity();
Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); Timestamp last_untracked_send_time_ = Timestamp::MinusInfinity();
Timestamp last_untracked_send_time_ RTC_GUARDED_BY(&lock_) = SequenceNumberUnwrapper seq_num_unwrapper_;
Timestamp::MinusInfinity(); std::map<int64_t, PacketFeedback> history_;
SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
std::map<int64_t, PacketFeedback> history_ RTC_GUARDED_BY(&lock_);
// Sequence numbers are never negative, using -1 as it always < a real // Sequence numbers are never negative, using -1 as it always < a real
// sequence number. // sequence number.
int64_t last_ack_seq_num_ RTC_GUARDED_BY(&lock_) = -1; int64_t last_ack_seq_num_ = -1;
InFlightBytesTracker in_flight_ RTC_GUARDED_BY(&lock_); InFlightBytesTracker in_flight_;
Timestamp current_offset_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity(); Timestamp current_offset_ = Timestamp::MinusInfinity();
TimeDelta last_timestamp_ RTC_GUARDED_BY(&lock_) = TimeDelta::MinusInfinity(); TimeDelta last_timestamp_ = TimeDelta::MinusInfinity();
uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0; uint16_t local_net_id_ = 0;
uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0; uint16_t remote_net_id_ = 0;
rtc::CriticalSection observers_lock_;
// Maps a set of ssrcs to corresponding observer. Vectors are used rather than // Maps a set of ssrcs to corresponding observer. Vectors are used rather than
// set/map to ensure that the processing order is consistent independently of // set/map to ensure that the processing order is consistent independently of
// the randomized ssrcs. // the randomized ssrcs.
std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>> std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
observers_ RTC_GUARDED_BY(&observers_lock_); observers_;
}; };
} // namespace webrtc } // namespace webrtc