Files
platform-external-webrtc/modules/remote_bitrate_estimator/test/packet_sender.cc
Erik Språng d28796209b Distinguish between missing packet and send failure.
This CL introduces three-value enum, in order to be able to distinguish
between send success, send failure, and invalid states such as missing
packet or invalid ssrc.

The behavior is unchanged in this CL, a follow-up will change the pacer
to not consume media budget on invalid states.

Bug: webrtc:8052,webrtc:8975
Change-Id: I1c9e2226f995356daa538d3d3cf44945f35e0133
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/135165
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27923}
2019-05-13 10:24:09 +00:00

534 lines
17 KiB
C++

/*
* Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/remote_bitrate_estimator/test/packet_sender.h"
#include <assert.h>
#include <math.h>
#include <algorithm>
#include <cmath>
#include <limits>
#include <list>
#include <vector>
#include "modules/include/module_common_types_public.h"
#include "modules/pacing/pacer.h"
#include "modules/remote_bitrate_estimator/test/bbr_paced_sender.h"
#include "modules/remote_bitrate_estimator/test/bwe.h"
#include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
#include "modules/remote_bitrate_estimator/test/metric_recorder.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "rtc_base/checks.h"
namespace webrtc {
namespace testing {
namespace bwe {
namespace {
const float kPaceMultiplier = 2.5f;
}
uint32_t PacketSender::TargetBitrateKbps() {
return 0;
}
void PacketSender::Pause() {
running_ = false;
if (metric_recorder_ != nullptr) {
metric_recorder_->PauseFlow();
}
}
void PacketSender::Resume(int64_t paused_time_ms) {
running_ = true;
if (metric_recorder_ != nullptr) {
metric_recorder_->ResumeFlow(paused_time_ms);
}
}
void PacketSender::set_metric_recorder(MetricRecorder* metric_recorder) {
metric_recorder_ = metric_recorder;
}
void PacketSender::RecordBitrate() {
if (metric_recorder_ != nullptr) {
BWE_TEST_LOGGING_CONTEXT("Sender");
BWE_TEST_LOGGING_CONTEXT(*flow_ids().begin());
metric_recorder_->UpdateTimeMs(clock_.TimeInMilliseconds());
metric_recorder_->UpdateSendingEstimateKbps(TargetBitrateKbps());
}
}
std::list<FeedbackPacket*> GetFeedbackPackets(Packets* in_out,
int64_t end_time_ms,
int flow_id) {
std::list<FeedbackPacket*> fb_packets;
for (auto it = in_out->begin(); it != in_out->end();) {
if ((*it)->send_time_us() > 1000 * end_time_ms)
break;
if ((*it)->GetPacketType() == Packet::kFeedback &&
flow_id == (*it)->flow_id()) {
fb_packets.push_back(static_cast<FeedbackPacket*>(*it));
it = in_out->erase(it);
} else {
++it;
}
}
return fb_packets;
}
VideoSender::VideoSender(PacketProcessorListener* listener,
VideoSource* source,
BandwidthEstimatorType estimator_type)
: PacketSender(listener, source->flow_id()),
source_(source),
bwe_(CreateBweSender(estimator_type,
source_->bits_per_second() / 1000,
this,
&clock_)),
previous_sending_bitrate_(0) {
modules_.push_back(bwe_.get());
}
VideoSender::~VideoSender() {}
void VideoSender::Pause() {
previous_sending_bitrate_ = TargetBitrateKbps();
PacketSender::Pause();
}
void VideoSender::Resume(int64_t paused_time_ms) {
source_->SetBitrateBps(previous_sending_bitrate_);
PacketSender::Resume(paused_time_ms);
}
void VideoSender::RunFor(int64_t time_ms, Packets* in_out) {
std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
in_out, clock_.TimeInMilliseconds() + time_ms, source_->flow_id());
ProcessFeedbackAndGeneratePackets(time_ms, &feedbacks, in_out);
}
VideoSource* VideoSender::source() const {
return source_;
}
void VideoSender::ProcessFeedbackAndGeneratePackets(
int64_t time_ms,
std::list<FeedbackPacket*>* feedbacks,
Packets* packets) {
do {
// Make sure to at least run Process() below every 100 ms.
int64_t time_to_run_ms = std::min<int64_t>(time_ms, 100);
if (!feedbacks->empty()) {
int64_t time_until_feedback_ms =
feedbacks->front()->send_time_ms() - clock_.TimeInMilliseconds();
time_to_run_ms =
std::max<int64_t>(std::min(time_ms, time_until_feedback_ms), 0);
}
if (!running_) {
source_->SetBitrateBps(0);
}
Packets generated;
source_->RunFor(time_to_run_ms, &generated);
bwe_->OnPacketsSent(generated);
packets->merge(generated, DereferencingComparator<Packet>);
clock_.AdvanceTimeMilliseconds(time_to_run_ms);
if (!feedbacks->empty()) {
bwe_->GiveFeedback(*feedbacks->front());
delete feedbacks->front();
feedbacks->pop_front();
}
bwe_->Process();
time_ms -= time_to_run_ms;
} while (time_ms > 0);
assert(feedbacks->empty());
}
int VideoSender::GetFeedbackIntervalMs() const {
return bwe_->GetFeedbackIntervalMs();
}
void VideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
uint8_t fraction_lost,
int64_t rtt) {
source_->SetBitrateBps(target_bitrate_bps);
RecordBitrate();
}
uint32_t VideoSender::TargetBitrateKbps() {
return (source_->bits_per_second() + 500) / 1000;
}
PacedVideoSender::PacedVideoSender(PacketProcessorListener* listener,
VideoSource* source,
BandwidthEstimatorType estimator)
: VideoSender(listener, source, estimator),
pacer_(
estimator == kBbrEstimator
? static_cast<Pacer*>(new BbrPacedSender(&clock_, this, nullptr))
: static_cast<Pacer*>(new PacedSender(&clock_, this, nullptr))) {
modules_.push_back(pacer_.get());
pacer_->SetPacingRates(source->bits_per_second() * kPaceMultiplier, 0);
}
PacedVideoSender::~PacedVideoSender() {
for (Packet* packet : pacer_queue_)
delete packet;
for (Packet* packet : queue_)
delete packet;
}
void PacedVideoSender::RunFor(int64_t time_ms, Packets* in_out) {
int64_t end_time_ms = clock_.TimeInMilliseconds() + time_ms;
// Run process periodically to allow the packets to be paced out.
std::list<FeedbackPacket*> feedbacks =
GetFeedbackPackets(in_out, end_time_ms, source_->flow_id());
int64_t last_run_time_ms = -1;
BWE_TEST_LOGGING_CONTEXT("Sender");
BWE_TEST_LOGGING_CONTEXT(source_->flow_id());
do {
int64_t time_until_process_ms = TimeUntilNextProcess(modules_);
int64_t time_until_feedback_ms = time_ms;
if (!feedbacks.empty())
time_until_feedback_ms = std::max<int64_t>(
feedbacks.front()->send_time_ms() - clock_.TimeInMilliseconds(), 0);
int64_t time_until_next_event_ms =
std::min(time_until_feedback_ms, time_until_process_ms);
time_until_next_event_ms =
std::min(source_->GetTimeUntilNextFrameMs(), time_until_next_event_ms);
// Never run for longer than we have been asked for.
if (clock_.TimeInMilliseconds() + time_until_next_event_ms > end_time_ms)
time_until_next_event_ms = end_time_ms - clock_.TimeInMilliseconds();
// Make sure we don't get stuck if an event doesn't trigger. This typically
// happens if the prober wants to probe, but there's no packet to send.
if (time_until_next_event_ms == 0 && last_run_time_ms == 0)
time_until_next_event_ms = 1;
last_run_time_ms = time_until_next_event_ms;
Packets generated_packets;
source_->RunFor(time_until_next_event_ms, &generated_packets);
if (!generated_packets.empty()) {
for (Packet* packet : generated_packets) {
MediaPacket* media_packet = static_cast<MediaPacket*>(packet);
pacer_->InsertPacket(
PacedSender::kNormalPriority, media_packet->header().ssrc,
media_packet->header().sequenceNumber, media_packet->send_time_ms(),
media_packet->payload_size(), false);
pacer_queue_size_in_bytes_ += media_packet->payload_size();
pacer_queue_.push_back(packet);
assert(pacer_queue_.size() < 10000);
}
}
clock_.AdvanceTimeMilliseconds(time_until_next_event_ms);
if (time_until_next_event_ms == time_until_feedback_ms) {
if (!feedbacks.empty()) {
bwe_->GiveFeedback(*feedbacks.front());
delete feedbacks.front();
feedbacks.pop_front();
}
bwe_->Process();
}
if (time_until_next_event_ms == time_until_process_ms) {
CallProcess(modules_);
}
} while (clock_.TimeInMilliseconds() < end_time_ms);
QueuePackets(in_out, end_time_ms * 1000);
}
int64_t PacedVideoSender::TimeUntilNextProcess(
const std::list<Module*>& modules) {
int64_t time_until_next_process_ms = 10;
for (Module* module : modules) {
int64_t next_process_ms = module->TimeUntilNextProcess();
if (next_process_ms < time_until_next_process_ms)
time_until_next_process_ms = next_process_ms;
}
if (time_until_next_process_ms < 0)
time_until_next_process_ms = 0;
return time_until_next_process_ms;
}
void PacedVideoSender::CallProcess(const std::list<Module*>& modules) {
for (Module* module : modules) {
if (module->TimeUntilNextProcess() <= 0) {
module->Process();
}
}
}
void PacedVideoSender::QueuePackets(Packets* batch,
int64_t end_of_batch_time_us) {
queue_.merge(*batch, DereferencingComparator<Packet>);
if (queue_.empty()) {
return;
}
Packets::iterator it = queue_.begin();
for (; it != queue_.end(); ++it) {
if ((*it)->send_time_us() > end_of_batch_time_us) {
break;
}
}
Packets to_transfer;
to_transfer.splice(to_transfer.begin(), queue_, queue_.begin(), it);
bwe_->OnPacketsSent(to_transfer);
batch->merge(to_transfer, DereferencingComparator<Packet>);
}
RtpPacketSendResult PacedVideoSender::TimeToSendPacket(
uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission,
const PacedPacketInfo& pacing_info) {
for (Packets::iterator it = pacer_queue_.begin(); it != pacer_queue_.end();
++it) {
MediaPacket* media_packet = static_cast<MediaPacket*>(*it);
if (media_packet->header().sequenceNumber == sequence_number) {
int64_t pace_out_time_ms = clock_.TimeInMilliseconds();
// Make sure a packet is never paced out earlier than when it was put into
// the pacer.
RTC_CHECK_GE(pace_out_time_ms, media_packet->send_time_ms());
media_packet->SetAbsSendTimeMs(pace_out_time_ms);
media_packet->set_send_time_us(1000 * pace_out_time_ms);
media_packet->set_sender_timestamp_us(1000 * pace_out_time_ms);
queue_.push_back(media_packet);
pacer_queue_size_in_bytes_ -= media_packet->payload_size();
pacer_queue_.erase(it);
return RtpPacketSendResult::kSuccess;
}
}
return RtpPacketSendResult::kTransportUnavailable;
}
size_t PacedVideoSender::TimeToSendPadding(size_t bytes,
const PacedPacketInfo& pacing_info) {
return 0;
}
void PacedVideoSender::OnNetworkChanged(uint32_t target_bitrate_bps,
uint8_t fraction_lost,
int64_t rtt) {
VideoSender::OnNetworkChanged(target_bitrate_bps, fraction_lost, rtt);
pacer_->SetPacingRates(target_bitrate_bps * kPaceMultiplier, 0);
}
void PacedVideoSender::OnNetworkChanged(uint32_t bitrate_for_encoder_bps,
uint32_t bitrate_for_pacer_bps,
bool in_probe_rtt,
int64_t target_set_time,
uint64_t congestion_window) {
VideoSender::OnNetworkChanged(bitrate_for_encoder_bps, 0u, 0u);
pacer_->SetEstimatedBitrateAndCongestionWindow(
bitrate_for_pacer_bps, in_probe_rtt, congestion_window);
}
size_t PacedVideoSender::pacer_queue_size_in_bytes() {
return pacer_queue_size_in_bytes_;
}
void PacedVideoSender::OnBytesAcked(size_t bytes) {
pacer_->OnBytesAcked(bytes);
}
const int kNoLimit = std::numeric_limits<int>::max();
const int kPacketSizeBytes = 1200;
TcpSender::TcpSender(PacketProcessorListener* listener,
int flow_id,
int64_t offset_ms)
: TcpSender(listener, flow_id, offset_ms, kNoLimit) {}
TcpSender::TcpSender(PacketProcessorListener* listener,
int flow_id,
int64_t offset_ms,
int send_limit_bytes)
: PacketSender(listener, flow_id),
cwnd_(10),
ssthresh_(kNoLimit),
ack_received_(false),
last_acked_seq_num_(0),
next_sequence_number_(0),
offset_ms_(offset_ms),
last_reduction_time_ms_(-1),
last_rtt_ms_(0),
total_sent_bytes_(0),
send_limit_bytes_(send_limit_bytes),
last_generated_packets_ms_(0),
num_recent_sent_packets_(0),
bitrate_kbps_(0) {}
TcpSender::~TcpSender() = default;
void TcpSender::RunFor(int64_t time_ms, Packets* in_out) {
if (clock_.TimeInMilliseconds() + time_ms < offset_ms_) {
clock_.AdvanceTimeMilliseconds(time_ms);
if (running_) {
Pause();
}
return;
}
if (!running_ && total_sent_bytes_ == 0) {
Resume(offset_ms_);
}
int64_t start_time_ms = clock_.TimeInMilliseconds();
std::list<FeedbackPacket*> feedbacks = GetFeedbackPackets(
in_out, clock_.TimeInMilliseconds() + time_ms, *flow_ids().begin());
// The number of packets which are sent in during time_ms depends on the
// number of packets in_flight_ and the max number of packets in flight
// (cwnd_). Therefore SendPackets() isn't directly dependent on time_ms.
for (FeedbackPacket* fb : feedbacks) {
clock_.AdvanceTimeMilliseconds(fb->send_time_ms() -
clock_.TimeInMilliseconds());
last_rtt_ms_ = fb->send_time_ms() - fb->latest_send_time_ms();
UpdateCongestionControl(fb);
SendPackets(in_out);
}
for (auto it = in_flight_.begin(); it != in_flight_.end();) {
if (it->time_ms < clock_.TimeInMilliseconds() - 1000)
in_flight_.erase(it++);
else
++it;
}
clock_.AdvanceTimeMilliseconds(time_ms -
(clock_.TimeInMilliseconds() - start_time_ms));
SendPackets(in_out);
}
int TcpSender::GetFeedbackIntervalMs() const {
return 10;
}
void TcpSender::SendPackets(Packets* in_out) {
int cwnd = std::ceil(cwnd_);
int packets_to_send = std::max(cwnd - static_cast<int>(in_flight_.size()), 0);
int timed_out = TriggerTimeouts();
if (timed_out > 0) {
HandleLoss();
}
if (packets_to_send > 0) {
Packets generated = GeneratePackets(packets_to_send);
for (Packet* packet : generated)
in_flight_.insert(InFlight(*static_cast<MediaPacket*>(packet)));
in_out->merge(generated, DereferencingComparator<Packet>);
}
}
void TcpSender::UpdateCongestionControl(const FeedbackPacket* fb) {
const TcpFeedback* tcp_fb = static_cast<const TcpFeedback*>(fb);
RTC_DCHECK(!tcp_fb->acked_packets().empty());
ack_received_ = true;
uint16_t expected = tcp_fb->acked_packets().back() - last_acked_seq_num_;
uint16_t missing =
expected - static_cast<uint16_t>(tcp_fb->acked_packets().size());
for (uint16_t ack_seq_num : tcp_fb->acked_packets())
in_flight_.erase(InFlight(ack_seq_num, clock_.TimeInMilliseconds()));
if (missing > 0) {
HandleLoss();
} else if (cwnd_ <= ssthresh_) {
cwnd_ += tcp_fb->acked_packets().size();
} else {
cwnd_ += 1.0f / cwnd_;
}
last_acked_seq_num_ =
LatestSequenceNumber(tcp_fb->acked_packets().back(), last_acked_seq_num_);
}
int TcpSender::TriggerTimeouts() {
int timed_out = 0;
for (auto it = in_flight_.begin(); it != in_flight_.end();) {
if (it->time_ms < clock_.TimeInMilliseconds() - 1000) {
in_flight_.erase(it++);
++timed_out;
} else {
++it;
}
}
return timed_out;
}
void TcpSender::HandleLoss() {
if (clock_.TimeInMilliseconds() - last_reduction_time_ms_ < last_rtt_ms_)
return;
last_reduction_time_ms_ = clock_.TimeInMilliseconds();
ssthresh_ = std::max(static_cast<int>(in_flight_.size() / 2), 2);
cwnd_ = ssthresh_;
}
Packets TcpSender::GeneratePackets(size_t num_packets) {
Packets generated;
UpdateSendBitrateEstimate(num_packets);
for (size_t i = 0; i < num_packets; ++i) {
if ((total_sent_bytes_ + kPacketSizeBytes) > send_limit_bytes_) {
if (running_) {
Pause();
}
break;
}
generated.push_back(
new MediaPacket(*flow_ids().begin(), 1000 * clock_.TimeInMilliseconds(),
kPacketSizeBytes, next_sequence_number_++));
generated.back()->set_sender_timestamp_us(1000 *
clock_.TimeInMilliseconds());
total_sent_bytes_ += kPacketSizeBytes;
}
return generated;
}
void TcpSender::UpdateSendBitrateEstimate(size_t num_packets) {
const int kTimeWindowMs = 500;
num_recent_sent_packets_ += num_packets;
int64_t delta_ms = clock_.TimeInMilliseconds() - last_generated_packets_ms_;
if (delta_ms >= kTimeWindowMs) {
bitrate_kbps_ =
static_cast<uint32_t>(8 * num_recent_sent_packets_ * kPacketSizeBytes) /
delta_ms;
last_generated_packets_ms_ = clock_.TimeInMilliseconds();
num_recent_sent_packets_ = 0;
}
RecordBitrate();
}
uint32_t TcpSender::TargetBitrateKbps() {
return bitrate_kbps_;
}
} // namespace bwe
} // namespace testing
} // namespace webrtc