Files
platform-external-webrtc/modules/congestion_controller/rtp/send_side_congestion_controller.cc
Sebastian Jansson 4932aa185c Avoids posting tasks in congestion controller.
This CL makes calls to send side congestion controller that originates
from the task queue execute directly rather than posting a task. This
ensures that side effects are applied by the time the call returns.

This reduces the risk that the task queue version of the congestion
controller introduces races that does not exist in the process thread
based version.

Bug: webrtc:9586
Change-Id: I82de032dc971c791a0f86d20ccbd47cbb09eba4b
Reviewed-on: https://webrtc-review.googlesource.com/85360
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24382}
2018-08-22 09:47:02 +00:00

831 lines
30 KiB
C++

/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
#include <algorithm>
#include <functional>
#include <memory>
#include <vector>
#include "absl/memory/memory.h"
#include "api/transport/network_types.h"
#include "modules/congestion_controller/congestion_window_pushback_controller.h"
#include "modules/congestion_controller/goog_cc/include/goog_cc_factory.h"
#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
#include "rtc_base/bind.h"
#include "rtc_base/checks.h"
#include "rtc_base/format_macros.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
#include "rtc_base/numerics/safe_minmax.h"
#include "rtc_base/rate_limiter.h"
#include "rtc_base/sequenced_task_checker.h"
#include "rtc_base/socket.h"
#include "rtc_base/timeutils.h"
#include "system_wrappers/include/field_trial.h"
#include "system_wrappers/include/runtime_enabled_features.h"
using absl::make_unique;
namespace webrtc {
namespace webrtc_cc {
namespace {
using send_side_cc_internal::PeriodicTask;
const char kCwndExperiment[] = "WebRTC-CwndExperiment";
// When CongestionWindowPushback is enabled, the pacer is oblivious to
// the congestion window. The relation between outstanding data and
// the congestion window affects encoder allocations directly.
const char kCongestionPushbackExperiment[] = "WebRTC-CongestionWindowPushback";
// When PacerPushbackExperiment is enabled, build-up in the pacer due to
// the congestion window and/or data spikes reduces encoder allocations.
const char kPacerPushbackExperiment[] = "WebRTC-PacerPushbackExperiment";
const int64_t PacerQueueUpdateIntervalMs = 25;
bool IsPacerPushbackExperimentEnabled() {
return webrtc::field_trial::IsEnabled(kPacerPushbackExperiment) ||
(!webrtc::field_trial::IsDisabled(kPacerPushbackExperiment) &&
webrtc::runtime_enabled_features::IsFeatureEnabled(
webrtc::runtime_enabled_features::kDualStreamModeFeatureName));
}
bool IsCongestionWindowPushbackExperimentEnabled() {
return webrtc::field_trial::IsEnabled(kCongestionPushbackExperiment) &&
webrtc::field_trial::IsEnabled(kCwndExperiment);
}
std::unique_ptr<CongestionWindowPushbackController>
MaybeInitalizeCongestionWindowPushbackController() {
return IsCongestionWindowPushbackExperimentEnabled()
? absl::make_unique<CongestionWindowPushbackController>()
: nullptr;
}
void SortPacketFeedbackVector(std::vector<webrtc::PacketFeedback>* input) {
std::sort(input->begin(), input->end(), PacketFeedbackComparator());
}
PacketResult NetworkPacketFeedbackFromRtpPacketFeedback(
const webrtc::PacketFeedback& pf) {
PacketResult feedback;
if (pf.arrival_time_ms == webrtc::PacketFeedback::kNotReceived)
feedback.receive_time = Timestamp::Infinity();
else
feedback.receive_time = Timestamp::ms(pf.arrival_time_ms);
if (pf.send_time_ms != webrtc::PacketFeedback::kNoSendTime) {
feedback.sent_packet = SentPacket();
feedback.sent_packet->sequence_number = pf.long_sequence_number;
feedback.sent_packet->send_time = Timestamp::ms(pf.send_time_ms);
feedback.sent_packet->size = DataSize::bytes(pf.payload_size);
feedback.sent_packet->pacing_info = pf.pacing_info;
}
return feedback;
}
std::vector<PacketResult> PacketResultsFromRtpFeedbackVector(
const std::vector<PacketFeedback>& feedback_vector) {
RTC_DCHECK(std::is_sorted(feedback_vector.begin(), feedback_vector.end(),
PacketFeedbackComparator()));
std::vector<PacketResult> packet_feedbacks;
packet_feedbacks.reserve(feedback_vector.size());
for (const PacketFeedback& rtp_feedback : feedback_vector) {
auto feedback = NetworkPacketFeedbackFromRtpPacketFeedback(rtp_feedback);
packet_feedbacks.push_back(feedback);
}
return packet_feedbacks;
}
TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
int max_bitrate_bps,
const Clock* clock) {
TargetRateConstraints msg;
msg.at_time = Timestamp::ms(clock->TimeInMilliseconds());
msg.min_data_rate =
min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero();
msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps)
: DataRate::Infinity();
return msg;
}
// The template closure pattern is based on rtc::ClosureTask.
template <class Closure>
class PeriodicTaskImpl final : public PeriodicTask {
public:
PeriodicTaskImpl(rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure)
: task_queue_(task_queue),
period_ms_(period_ms),
closure_(std::forward<Closure>(closure)) {}
bool Run() override {
if (!running_)
return true;
closure_();
// absl::WrapUnique lets us repost this task on the TaskQueue.
task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_);
// Return false to tell TaskQueue to not destruct this object, since we have
// taken ownership with absl::WrapUnique.
return false;
}
void Stop() override {
if (task_queue_->IsCurrent()) {
RTC_DCHECK(running_);
running_ = false;
} else {
task_queue_->PostTask([this] { Stop(); });
}
}
private:
rtc::TaskQueue* const task_queue_;
const int64_t period_ms_;
typename std::remove_const<
typename std::remove_reference<Closure>::type>::type closure_;
bool running_ = true;
};
template <class Closure>
static PeriodicTask* StartPeriodicTask(rtc::TaskQueue* task_queue,
int64_t period_ms,
Closure&& closure) {
auto periodic_task = absl::make_unique<PeriodicTaskImpl<Closure>>(
task_queue, period_ms, std::forward<Closure>(closure));
PeriodicTask* periodic_task_ptr = periodic_task.get();
task_queue->PostDelayedTask(std::move(periodic_task), period_ms);
return periodic_task_ptr;
}
} // namespace
namespace send_side_cc_internal {
class ControlHandler {
public:
ControlHandler(NetworkChangedObserver* observer,
PacerController* pacer_controller,
const Clock* clock);
void PostUpdates(NetworkControlUpdate update);
void OnNetworkAvailability(NetworkAvailability msg);
void OnOutstandingData(DataSize in_flight_data);
void OnPacerQueueUpdate(TimeDelta expected_queue_time);
absl::optional<TargetTransferRate> last_transfer_rate();
private:
void OnNetworkInvalidation();
bool GetNetworkParameters(int32_t* estimated_bitrate_bps,
uint8_t* fraction_loss,
int64_t* rtt_ms);
bool IsSendQueueFull() const;
bool HasNetworkParametersToReportChanged(int64_t bitrate_bps,
uint8_t fraction_loss,
int64_t rtt);
NetworkChangedObserver* observer_ = nullptr;
PacerController* pacer_controller_;
absl::optional<TargetTransferRate> current_target_rate_msg_;
bool network_available_ = true;
int64_t last_reported_target_bitrate_bps_ = 0;
uint8_t last_reported_fraction_loss_ = 0;
int64_t last_reported_rtt_ms_ = 0;
const bool pacer_pushback_experiment_ = false;
uint32_t min_pushback_target_bitrate_bps_;
int64_t pacer_expected_queue_ms_ = 0;
double encoding_rate_ratio_ = 1.0;
const std::unique_ptr<CongestionWindowPushbackController>
congestion_window_pushback_controller_;
rtc::SequencedTaskChecker sequenced_checker_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(ControlHandler);
};
ControlHandler::ControlHandler(NetworkChangedObserver* observer,
PacerController* pacer_controller,
const Clock* clock)
: observer_(observer),
pacer_controller_(pacer_controller),
pacer_pushback_experiment_(IsPacerPushbackExperimentEnabled()),
congestion_window_pushback_controller_(
MaybeInitalizeCongestionWindowPushbackController()) {
sequenced_checker_.Detach();
}
void ControlHandler::PostUpdates(NetworkControlUpdate update) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (update.congestion_window) {
if (congestion_window_pushback_controller_) {
congestion_window_pushback_controller_->SetDataWindow(
update.congestion_window.value());
} else {
pacer_controller_->OnCongestionWindow(*update.congestion_window);
}
}
if (update.pacer_config) {
pacer_controller_->OnPacerConfig(*update.pacer_config);
}
for (const auto& probe : update.probe_cluster_configs) {
pacer_controller_->OnProbeClusterConfig(probe);
}
if (update.target_rate) {
current_target_rate_msg_ = *update.target_rate;
OnNetworkInvalidation();
}
}
void ControlHandler::OnNetworkAvailability(NetworkAvailability msg) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
network_available_ = msg.network_available;
OnNetworkInvalidation();
}
void ControlHandler::OnOutstandingData(DataSize in_flight_data) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
if (congestion_window_pushback_controller_) {
congestion_window_pushback_controller_->UpdateOutstandingData(
in_flight_data.bytes());
}
OnNetworkInvalidation();
}
void ControlHandler::OnPacerQueueUpdate(TimeDelta expected_queue_time) {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
pacer_expected_queue_ms_ = expected_queue_time.ms();
OnNetworkInvalidation();
}
void ControlHandler::OnNetworkInvalidation() {
if (!current_target_rate_msg_.has_value())
return;
uint32_t target_bitrate_bps = current_target_rate_msg_->target_rate.bps();
int64_t rtt_ms =
current_target_rate_msg_->network_estimate.round_trip_time.ms();
float loss_rate_ratio =
current_target_rate_msg_->network_estimate.loss_rate_ratio;
int loss_ratio_255 = loss_rate_ratio * 255;
uint8_t fraction_loss =
rtc::dchecked_cast<uint8_t>(rtc::SafeClamp(loss_ratio_255, 0, 255));
int64_t probing_interval_ms =
current_target_rate_msg_->network_estimate.bwe_period.ms();
if (!network_available_) {
target_bitrate_bps = 0;
} else if (congestion_window_pushback_controller_) {
target_bitrate_bps =
congestion_window_pushback_controller_->UpdateTargetBitrate(
target_bitrate_bps);
} else if (!pacer_pushback_experiment_) {
target_bitrate_bps = IsSendQueueFull() ? 0 : target_bitrate_bps;
} else {
int64_t queue_length_ms = pacer_expected_queue_ms_;
if (queue_length_ms == 0) {
encoding_rate_ratio_ = 1.0;
} else if (queue_length_ms > 50) {
double encoding_ratio = 1.0 - queue_length_ms / 1000.0;
encoding_rate_ratio_ = std::min(encoding_rate_ratio_, encoding_ratio);
encoding_rate_ratio_ = std::max(encoding_rate_ratio_, 0.0);
}
target_bitrate_bps *= encoding_rate_ratio_;
target_bitrate_bps = target_bitrate_bps < 50000 ? 0 : target_bitrate_bps;
}
if (HasNetworkParametersToReportChanged(target_bitrate_bps, fraction_loss,
rtt_ms)) {
observer_->OnNetworkChanged(target_bitrate_bps, fraction_loss, rtt_ms,
probing_interval_ms);
}
}
bool ControlHandler::HasNetworkParametersToReportChanged(
int64_t target_bitrate_bps,
uint8_t fraction_loss,
int64_t rtt_ms) {
bool changed = last_reported_target_bitrate_bps_ != target_bitrate_bps ||
(target_bitrate_bps > 0 &&
(last_reported_fraction_loss_ != fraction_loss ||
last_reported_rtt_ms_ != rtt_ms));
if (changed &&
(last_reported_target_bitrate_bps_ == 0 || target_bitrate_bps == 0)) {
RTC_LOG(LS_INFO) << "Bitrate estimate state changed, BWE: "
<< target_bitrate_bps << " bps.";
}
last_reported_target_bitrate_bps_ = target_bitrate_bps;
last_reported_fraction_loss_ = fraction_loss;
last_reported_rtt_ms_ = rtt_ms;
return changed;
}
bool ControlHandler::IsSendQueueFull() const {
return pacer_expected_queue_ms_ > PacedSender::kMaxQueueLengthMs;
}
absl::optional<TargetTransferRate> ControlHandler::last_transfer_rate() {
RTC_DCHECK_CALLED_SEQUENTIALLY(&sequenced_checker_);
return current_target_rate_msg_;
}
} // namespace send_side_cc_internal
SendSideCongestionController::SendSideCongestionController(
const Clock* clock,
rtc::TaskQueue* task_queue,
RtcEventLog* event_log,
PacedSender* pacer,
int start_bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps,
NetworkControllerFactoryInterface* controller_factory)
: clock_(clock),
pacer_(pacer),
transport_feedback_adapter_(clock_),
controller_factory_with_feedback_(controller_factory),
controller_factory_fallback_(
absl::make_unique<GoogCcNetworkControllerFactory>(event_log)),
pacer_controller_(absl::make_unique<PacerController>(pacer_)),
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
observer_(nullptr),
send_side_bwe_with_overhead_(
webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
periodic_tasks_enabled_(true),
packet_feedback_available_(false),
pacer_queue_update_task_(nullptr),
controller_task_(nullptr),
task_queue_(task_queue) {
initial_config_.constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
RTC_DCHECK(start_bitrate_bps > 0);
initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps);
}
// There is no point in having a network controller for a network that is not
// yet available and if we don't have any observer of it's state.
// MaybeCreateControllers is used to trigger creation if those things are
// fulfilled. This is needed since dependent code uses the period until network
// is signalled to be avaliabile to set the expected start bitrate which is sent
// to the initializer for NetworkControllers. The observer is injected later due
// to a circular dependency between RtpTransportControllerSend and Call.
// TODO(srte): Break the circular dependency issue and make sure that starting
// bandwidth is set before this class is initialized so the controllers can be
// created in the constructor.
void SendSideCongestionController::MaybeCreateControllers() {
if (!controller_)
MaybeRecreateControllers();
}
void SendSideCongestionController::MaybeRecreateControllers() {
if (!network_available_ || !observer_)
return;
if (!control_handler_) {
control_handler_ = absl::make_unique<send_side_cc_internal::ControlHandler>(
observer_, pacer_controller_.get(), clock_);
}
initial_config_.constraints.at_time =
Timestamp::ms(clock_->TimeInMilliseconds());
initial_config_.stream_based_config = streams_config_;
if (!controller_) {
// TODO(srte): Use fallback controller if no feedback is available.
if (controller_factory_with_feedback_) {
RTC_LOG(LS_INFO) << "Creating feedback based only controller";
controller_ = controller_factory_with_feedback_->Create(initial_config_);
process_interval_ =
controller_factory_with_feedback_->GetProcessInterval();
} else {
RTC_LOG(LS_INFO) << "Creating fallback controller";
controller_ = controller_factory_fallback_->Create(initial_config_);
process_interval_ = controller_factory_fallback_->GetProcessInterval();
}
UpdateControllerWithTimeInterval();
StartProcessPeriodicTasks();
}
RTC_DCHECK(controller_);
}
SendSideCongestionController::~SendSideCongestionController() = default;
void SendSideCongestionController::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer);
}
void SendSideCongestionController::DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer);
}
void SendSideCongestionController::RegisterNetworkObserver(
NetworkChangedObserver* observer) {
task_queue_->PostTask([this, observer]() {
RTC_DCHECK_RUN_ON(task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
MaybeCreateControllers();
});
}
void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps,
int start_bitrate_bps,
int max_bitrate_bps) {
TargetRateConstraints constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
task_queue_->PostTask([this, constraints, start_bitrate_bps]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(
controller_->OnTargetRateConstraints(constraints));
} else {
initial_config_.constraints = constraints;
if (start_bitrate_bps > 0)
initial_config_.starting_bandwidth = DataRate::bps(start_bitrate_bps);
}
});
}
void SendSideCongestionController::SetAllocatedSendBitrateLimits(
int64_t min_send_bitrate_bps,
int64_t max_padding_bitrate_bps,
int64_t max_total_bitrate_bps) {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
streams_config_.max_total_allocated_bitrate =
DataRate::bps(max_total_bitrate_bps);
UpdateStreamsConfig();
}
// TODO(holmer): Split this up and use SetBweBitrates in combination with
// OnNetworkRouteChanged.
void SendSideCongestionController::OnNetworkRouteChanged(
const rtc::NetworkRoute& network_route,
int start_bitrate_bps,
int min_bitrate_bps,
int max_bitrate_bps) {
transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id,
network_route.remote_network_id);
transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
NetworkRouteChange msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.constraints =
ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
if (start_bitrate_bps > 0)
msg.starting_rate = DataRate::bps(start_bitrate_bps);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
} else {
if (msg.starting_rate)
initial_config_.starting_bandwidth = *msg.starting_rate;
initial_config_.constraints = msg.constraints;
}
pacer_controller_->OnNetworkRouteChange(msg);
});
}
bool SendSideCongestionController::AvailableBandwidth(
uint32_t* bandwidth) const {
// This is only called in the OnNetworkChanged callback in
// RtpTransportControllerSend which is called from ControlHandler, which is
// running on the task queue.
// TODO(srte): Remove this function when RtpTransportControllerSend stops
// calling it.
RTC_DCHECK_RUN_ON(task_queue_);
if (!control_handler_) {
return false;
}
// TODO(srte): Remove this interface and push information about bandwidth
// estimation to users of this class, thereby reducing synchronous calls.
if (control_handler_->last_transfer_rate().has_value()) {
*bandwidth = control_handler_->last_transfer_rate()
->network_estimate.bandwidth.bps();
return true;
}
return false;
}
RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() {
return this;
}
void SendSideCongestionController::SetPerPacketFeedbackAvailable(
bool available) {
RTC_DCHECK_RUN_ON(task_queue_);
packet_feedback_available_ = available;
MaybeRecreateControllers();
}
void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
task_queue_->PostTask([this, enable]() {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.requests_alr_probing = enable;
UpdateStreamsConfig();
});
}
void SendSideCongestionController::UpdateStreamsConfig() {
streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
if (controller_)
control_handler_->PostUpdates(
controller_->OnStreamsConfig(streams_config_));
}
TransportFeedbackObserver*
SendSideCongestionController::GetTransportFeedbackObserver() {
return this;
}
void SendSideCongestionController::SignalNetworkState(NetworkState state) {
RTC_LOG(LS_INFO) << "SignalNetworkState "
<< (state == kNetworkUp ? "Up" : "Down");
NetworkAvailability msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.network_available = state == kNetworkUp;
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
network_available_ = msg.network_available;
if (controller_) {
control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
pacer_controller_->OnNetworkAvailability(msg);
control_handler_->OnNetworkAvailability(msg);
} else {
MaybeCreateControllers();
}
});
}
void SendSideCongestionController::OnSentPacket(
const rtc::SentPacket& sent_packet) {
// We're not interested in packets without an id, which may be stun packets,
// etc, sent on the same transport.
if (sent_packet.packet_id == -1)
return;
transport_feedback_adapter_.OnSentPacket(sent_packet.packet_id,
sent_packet.send_time_ms);
MaybeUpdateOutstandingData();
auto packet = transport_feedback_adapter_.GetPacket(sent_packet.packet_id);
if (packet.has_value()) {
SentPacket msg;
msg.size = DataSize::bytes(packet->payload_size);
msg.send_time = Timestamp::ms(packet->send_time_ms);
msg.sequence_number = packet->long_sequence_number;
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnSentPacket(msg));
});
}
}
void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
int64_t max_rtt_ms) {
int64_t now_ms = clock_->TimeInMilliseconds();
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
report.smoothed = true;
task_queue_->PostTask([this, report]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
int64_t SendSideCongestionController::TimeUntilNextProcess() {
// Using task queue to process, just sleep long to avoid wasting resources.
return 60 * 1000;
}
void SendSideCongestionController::Process() {
// Ignored, using task queue to process.
}
void SendSideCongestionController::StartProcessPeriodicTasks() {
if (!periodic_tasks_enabled_)
return;
if (!pacer_queue_update_task_) {
pacer_queue_update_task_ =
StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdatePacerQueue();
});
}
if (controller_task_) {
// Stop is not synchronous, but is guaranteed to occur before the first
// invocation of the new controller task started below.
controller_task_->Stop();
controller_task_ = nullptr;
}
if (process_interval_.IsFinite()) {
// The controller task is owned by the task queue and lives until the task
// queue is destroyed or some time after Stop() is called, whichever comes
// first.
controller_task_ =
StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
});
}
}
void SendSideCongestionController::UpdateControllerWithTimeInterval() {
if (controller_) {
ProcessInterval msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
control_handler_->PostUpdates(controller_->OnProcessInterval(msg));
}
}
void SendSideCongestionController::UpdatePacerQueue() {
if (control_handler_) {
TimeDelta expected_queue_time =
TimeDelta::ms(pacer_->ExpectedQueueTimeMs());
control_handler_->OnPacerQueueUpdate(expected_queue_time);
}
}
void SendSideCongestionController::AddPacket(
uint32_t ssrc,
uint16_t sequence_number,
size_t length,
const PacedPacketInfo& pacing_info) {
if (send_side_bwe_with_overhead_) {
length += transport_overhead_bytes_per_packet_;
}
transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length,
pacing_info);
}
void SendSideCongestionController::OnTransportFeedback(
const rtcp::TransportFeedback& feedback) {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
int64_t feedback_time_ms = clock_->TimeInMilliseconds();
DataSize prior_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
transport_feedback_adapter_.OnTransportFeedback(feedback);
MaybeUpdateOutstandingData();
std::vector<PacketFeedback> feedback_vector =
transport_feedback_adapter_.GetTransportFeedbackVector();
SortPacketFeedbackVector(&feedback_vector);
if (!feedback_vector.empty()) {
TransportPacketsFeedback msg;
msg.packet_feedbacks = PacketResultsFromRtpFeedbackVector(feedback_vector);
msg.feedback_time = Timestamp::ms(feedback_time_ms);
msg.prior_in_flight = prior_in_flight;
msg.data_in_flight =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(
controller_->OnTransportPacketsFeedback(msg));
});
}
}
void SendSideCongestionController::MaybeUpdateOutstandingData() {
DataSize in_flight_data =
DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
task_queue_->PostTask([this, in_flight_data]() {
RTC_DCHECK_RUN_ON(task_queue_);
pacer_controller_->OnOutstandingData(in_flight_data);
if (control_handler_)
control_handler_->OnOutstandingData(in_flight_data);
});
}
std::vector<PacketFeedback>
SendSideCongestionController::GetTransportFeedbackVector() const {
RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
return transport_feedback_adapter_.GetTransportFeedbackVector();
}
void SendSideCongestionController::PostPeriodicTasksForTest() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_);
UpdateControllerWithTimeInterval();
UpdatePacerQueue();
});
}
void SendSideCongestionController::WaitOnTasksForTest() {
rtc::Event event(false, false);
task_queue_->PostTask([&event]() { event.Set(); });
event.Wait(rtc::Event::kForever);
}
void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
RTC_DCHECK_RUN_ON(task_queue_);
streams_config_.pacing_factor = pacing_factor;
UpdateStreamsConfig();
}
void SendSideCongestionController::SetAllocatedBitrateWithoutFeedback(
uint32_t bitrate_bps) {}
void SendSideCongestionController::DisablePeriodicTasks() {
task_queue_->PostTask([this]() {
RTC_DCHECK_RUN_ON(task_queue_);
periodic_tasks_enabled_ = false;
});
}
void SendSideCongestionController::OnReceivedEstimatedBitrate(
uint32_t bitrate) {
RemoteBitrateReport msg;
msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.bandwidth = DataRate::bps(bitrate);
task_queue_->PostTask([this, msg]() {
RTC_DCHECK_RUN_ON(task_queue_);
if (controller_)
control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
});
}
void SendSideCongestionController::OnReceivedRtcpReceiverReport(
const webrtc::ReportBlockList& report_blocks,
int64_t rtt_ms,
int64_t now_ms) {
task_queue_->PostTask([this, report_blocks, now_ms]() {
RTC_DCHECK_RUN_ON(task_queue_);
OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
});
task_queue_->PostTask([this, now_ms, rtt_ms]() {
RTC_DCHECK_RUN_ON(task_queue_);
RoundTripTimeUpdate report;
report.receive_time = Timestamp::ms(now_ms);
report.round_trip_time = TimeDelta::ms(rtt_ms);
report.smoothed = false;
if (controller_)
control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
});
}
void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks(
const ReportBlockList& report_blocks,
int64_t now_ms) {
if (report_blocks.empty())
return;
int total_packets_lost_delta = 0;
int total_packets_delta = 0;
// Compute the packet loss from all report blocks.
for (const RTCPReportBlock& report_block : report_blocks) {
auto it = last_report_blocks_.find(report_block.source_ssrc);
if (it != last_report_blocks_.end()) {
auto number_of_packets = report_block.extended_highest_sequence_number -
it->second.extended_highest_sequence_number;
total_packets_delta += number_of_packets;
auto lost_delta = report_block.packets_lost - it->second.packets_lost;
total_packets_lost_delta += lost_delta;
}
last_report_blocks_[report_block.source_ssrc] = report_block;
}
// Can only compute delta if there has been previous blocks to compare to. If
// not, total_packets_delta will be unchanged and there's nothing more to do.
if (!total_packets_delta)
return;
int packets_received_delta = total_packets_delta - total_packets_lost_delta;
// To detect lost packets, at least one packet has to be received. This check
// is needed to avoid bandwith detection update in
// VideoSendStreamTest.SuspendBelowMinBitrate
if (packets_received_delta < 1)
return;
Timestamp now = Timestamp::ms(now_ms);
TransportLossReport msg;
msg.packets_lost_delta = total_packets_lost_delta;
msg.packets_received_delta = packets_received_delta;
msg.receive_time = now;
msg.start_time = last_report_block_time_;
msg.end_time = now;
if (controller_)
control_handler_->PostUpdates(controller_->OnTransportLossReport(msg));
last_report_block_time_ = now;
}
} // namespace webrtc_cc
} // namespace webrtc