Minimizes risk of probes being late when using TaskQueuePacedSender.
The time precision of delayed tasks is one millisecond, so the TaskQueuePacedSender makes sure that is the minimum sleep time, and then allows sending prior data as if it was on time. Furthermore, if there already exists a pending task within 1ms of a new desired process time - we don't schedule a new one with the same motivation as above. These two facts clashes somewhat with how BitrateProber works, and especially if they coincide it can result in scheduled ProcessPackets() that is 2ms late. The default timeout set in BitrateProber is 3ms, so there is a higher risk of probes timing out. This CL changes the TaskQueuePacedSender to allow scheduling a ProcesPackets() call as soon as possible if we are probing - even if that means executing up to 1ms earlier than expected (the BitrateProber will compensate for that). The PacingController is updated in order to allow early execution in this one case. Bug: webrtc:10809 Change-Id: Ia5097ddc39aa80c05ebfe56369310c94ef0e0baf Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/178901 Reviewed-by: Sebastian Jansson <srte@webrtc.org> Commit-Queue: Erik Språng <sprang@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31778}
This commit is contained in:
@ -93,6 +93,7 @@ if (rtc_include_tests) {
|
||||
deps = [
|
||||
":interval_budget",
|
||||
":pacing",
|
||||
"../../api/transport:network_control",
|
||||
"../../api/units:data_rate",
|
||||
"../../api/units:time_delta",
|
||||
"../../modules/utility:mock_process_thread",
|
||||
|
||||
@ -39,6 +39,11 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2);
|
||||
// time. Applies only to periodic mode.
|
||||
constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30);
|
||||
|
||||
// Allow probes to be processed slightly ahead of inteded send time. Currently
|
||||
// set to 1ms as this is intended to allow times be rounded down to the nearest
|
||||
// millisecond.
|
||||
constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1);
|
||||
|
||||
constexpr int kFirstPriority = 0;
|
||||
|
||||
bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
|
||||
@ -306,7 +311,9 @@ void PacingController::EnqueuePacketInternal(
|
||||
}
|
||||
|
||||
TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
|
||||
if (last_process_time_.IsMinusInfinity()) {
|
||||
// If no previous processing, or last process was "in the future" because of
|
||||
// early probe processing, then there is no elapsed time to add budget for.
|
||||
if (last_process_time_.IsMinusInfinity() || now < last_process_time_) {
|
||||
return TimeDelta::Zero();
|
||||
}
|
||||
RTC_DCHECK_GE(now, last_process_time_);
|
||||
@ -400,10 +407,13 @@ void PacingController::ProcessPackets() {
|
||||
Timestamp target_send_time = now;
|
||||
if (mode_ == ProcessMode::kDynamic) {
|
||||
target_send_time = NextSendTime();
|
||||
TimeDelta early_execute_margin =
|
||||
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
|
||||
if (target_send_time.IsMinusInfinity()) {
|
||||
target_send_time = now;
|
||||
} else if (now < target_send_time) {
|
||||
} else if (now < target_send_time - early_execute_margin) {
|
||||
// We are too early, but if queue is empty still allow draining some debt.
|
||||
// Probing is allowed to be sent up to kMinSleepTime early.
|
||||
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
|
||||
UpdateBudgetWithElapsedTime(elapsed_time);
|
||||
return;
|
||||
@ -582,7 +592,7 @@ void PacingController::ProcessPackets() {
|
||||
|
||||
// If we are currently probing, we need to stop the send loop when we have
|
||||
// reached the send target.
|
||||
if (is_probing && data_sent > recommended_probe_size) {
|
||||
if (is_probing && data_sent >= recommended_probe_size) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -703,8 +713,9 @@ void PacingController::OnPaddingSent(DataSize data_sent) {
|
||||
if (data_sent > DataSize::Zero()) {
|
||||
UpdateBudgetWithSentData(data_sent);
|
||||
}
|
||||
last_send_time_ = CurrentTime();
|
||||
last_process_time_ = CurrentTime();
|
||||
Timestamp now = CurrentTime();
|
||||
last_send_time_ = now;
|
||||
last_process_time_ = now;
|
||||
}
|
||||
|
||||
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
|
||||
|
||||
@ -218,20 +218,30 @@ void TaskQueuePacedSender::MaybeProcessPackets(
|
||||
next_process_time = pacing_controller_.NextSendTime();
|
||||
}
|
||||
|
||||
const TimeDelta min_sleep = pacing_controller_.IsProbing()
|
||||
? PacingController::kMinSleepTime
|
||||
: hold_back_window_;
|
||||
next_process_time = std::max(now + min_sleep, next_process_time);
|
||||
absl::optional<TimeDelta> time_to_next_process;
|
||||
if (pacing_controller_.IsProbing() &&
|
||||
next_process_time != next_process_time_) {
|
||||
// If we're probing and there isn't already a wakeup scheduled for the next
|
||||
// process time, always post a task and just round sleep time down to
|
||||
// nearest millisecond.
|
||||
time_to_next_process =
|
||||
std::max(TimeDelta::Zero(),
|
||||
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
|
||||
} else if (next_process_time_.IsMinusInfinity() ||
|
||||
next_process_time <= next_process_time_ - hold_back_window_) {
|
||||
// Schedule a new task since there is none currently scheduled
|
||||
// (|next_process_time_| is infinite), or the new process time is at least
|
||||
// one holdback window earlier than whatever is currently scheduled.
|
||||
time_to_next_process = std::max(next_process_time - now, hold_back_window_);
|
||||
}
|
||||
|
||||
TimeDelta sleep_time = next_process_time - now;
|
||||
if (next_process_time_.IsMinusInfinity() ||
|
||||
next_process_time <=
|
||||
next_process_time_ - PacingController::kMinSleepTime) {
|
||||
if (time_to_next_process) {
|
||||
// Set a new scheduled process time and post a delayed task.
|
||||
next_process_time_ = next_process_time;
|
||||
|
||||
task_queue_.PostDelayedTask(
|
||||
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
|
||||
sleep_time.ms<uint32_t>());
|
||||
time_to_next_process->ms<uint32_t>());
|
||||
}
|
||||
|
||||
MaybeUpdateStats(false);
|
||||
|
||||
@ -10,12 +10,14 @@
|
||||
|
||||
#include "modules/pacing/task_queue_paced_sender.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "api/transport/network_types.h"
|
||||
#include "modules/pacing/packet_router.h"
|
||||
#include "modules/utility/include/mock/mock_process_thread.h"
|
||||
#include "test/field_trial.h"
|
||||
@ -63,13 +65,12 @@ class StatsUpdateObserver {
|
||||
|
||||
class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
|
||||
public:
|
||||
TaskQueuePacedSenderForTest(
|
||||
Clock* clock,
|
||||
PacketRouter* packet_router,
|
||||
RtcEventLog* event_log,
|
||||
const WebRtcKeyValueConfig* field_trials,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
TimeDelta hold_back_window = PacingController::kMinSleepTime)
|
||||
TaskQueuePacedSenderForTest(Clock* clock,
|
||||
PacketRouter* packet_router,
|
||||
RtcEventLog* event_log,
|
||||
const WebRtcKeyValueConfig* field_trials,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
TimeDelta hold_back_window)
|
||||
: TaskQueuePacedSender(clock,
|
||||
packet_router,
|
||||
event_log,
|
||||
@ -84,6 +85,27 @@ class TaskQueuePacedSenderForTest : public TaskQueuePacedSender {
|
||||
|
||||
size_t num_stats_updates_ = 0;
|
||||
};
|
||||
|
||||
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
|
||||
DataSize target_size) {
|
||||
// 224 bytes is the max padding size for plain padding packets generated by
|
||||
// RTPSender::GeneratePadding().
|
||||
const DataSize kMaxPaddingPacketSize = DataSize::Bytes(224);
|
||||
DataSize padding_generated = DataSize::Zero();
|
||||
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
|
||||
while (padding_generated < target_size) {
|
||||
DataSize packet_size =
|
||||
std::min(target_size - padding_generated, kMaxPaddingPacketSize);
|
||||
padding_generated += packet_size;
|
||||
auto padding_packet =
|
||||
std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
|
||||
padding_packet->set_packet_type(RtpPacketMediaType::kPadding);
|
||||
padding_packet->SetPadding(packet_size.bytes());
|
||||
padding_packets.push_back(std::move(padding_packet));
|
||||
}
|
||||
return padding_packets;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace test {
|
||||
@ -406,5 +428,125 @@ namespace test {
|
||||
EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
|
||||
}
|
||||
|
||||
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
|
||||
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
|
||||
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
|
||||
MockPacketRouter packet_router;
|
||||
TaskQueuePacedSenderForTest pacer(
|
||||
time_controller.GetClock(), &packet_router,
|
||||
/*event_log=*/nullptr,
|
||||
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
|
||||
PacingController::kMinSleepTime);
|
||||
|
||||
// Set rates so one packet adds 4ms of buffer level.
|
||||
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
|
||||
const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
|
||||
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
|
||||
pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
|
||||
EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
|
||||
return std::vector<std::unique_ptr<RtpPacketToSend>>();
|
||||
});
|
||||
EXPECT_CALL(packet_router, GeneratePadding(_))
|
||||
.WillRepeatedly(
|
||||
[](DataSize target_size) { return GeneratePadding(target_size); });
|
||||
|
||||
// Enqueue two packets, only the first is sent immediately and the next
|
||||
// will be scheduled for sending in 4ms.
|
||||
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
|
||||
const int kNotAProbe = PacedPacketInfo::kNotAProbe;
|
||||
EXPECT_CALL(
|
||||
packet_router,
|
||||
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
|
||||
kNotAProbe)));
|
||||
// Advance to less than 3ms before next packet send time.
|
||||
time_controller.AdvanceTime(TimeDelta::Micros(1001));
|
||||
|
||||
// Trigger a probe at 4x the current pacing rate and insert the number of
|
||||
// packets the probe needs.
|
||||
const DataRate kProbeRate = 2 * kPacingDataRate;
|
||||
const int kProbeClusterId = 1;
|
||||
pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
|
||||
|
||||
// Expected size for each probe in a cluster is twice the expected bits
|
||||
// sent during min_probe_delta.
|
||||
const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
|
||||
const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
|
||||
const size_t kNumPacketsInProbe =
|
||||
(kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
|
||||
EXPECT_CALL(
|
||||
packet_router,
|
||||
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
|
||||
kProbeClusterId)))
|
||||
.Times(kNumPacketsInProbe);
|
||||
|
||||
pacer.EnqueuePackets(
|
||||
GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe));
|
||||
time_controller.AdvanceTime(TimeDelta::Zero());
|
||||
|
||||
// The pacer should have scheduled the next probe to be sent in
|
||||
// kProbeTimeDelta. That there was existing scheduled call less than
|
||||
// PacingController::kMinSleepTime before this should not matter.
|
||||
|
||||
EXPECT_CALL(
|
||||
packet_router,
|
||||
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
|
||||
kProbeClusterId)))
|
||||
.Times(AtLeast(1));
|
||||
time_controller.AdvanceTime(TimeDelta::Millis(2));
|
||||
}
|
||||
|
||||
TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
|
||||
// Set min_probe_delta to be less than kMinSleepTime (1ms).
|
||||
const TimeDelta kMinProbeDelta = TimeDelta::Micros(100);
|
||||
ScopedFieldTrials trials(
|
||||
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
|
||||
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
|
||||
MockPacketRouter packet_router;
|
||||
TaskQueuePacedSenderForTest pacer(
|
||||
time_controller.GetClock(), &packet_router,
|
||||
/*event_log=*/nullptr,
|
||||
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
|
||||
PacingController::kMinSleepTime);
|
||||
|
||||
// Set rates so one packet adds 4ms of buffer level.
|
||||
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
|
||||
const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
|
||||
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
|
||||
pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
|
||||
EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
|
||||
return std::vector<std::unique_ptr<RtpPacketToSend>>();
|
||||
});
|
||||
EXPECT_CALL(packet_router, GeneratePadding)
|
||||
.WillRepeatedly(
|
||||
[](DataSize target_size) { return GeneratePadding(target_size); });
|
||||
|
||||
// Set a high probe rate.
|
||||
const int kProbeClusterId = 1;
|
||||
DataRate kProbingRate = kPacingDataRate * 10;
|
||||
pacer.CreateProbeCluster(kProbingRate, kProbeClusterId);
|
||||
|
||||
// Advance time less than PacingController::kMinSleepTime, probing packets
|
||||
// for the first millisecond should be sent immediately. Min delta between
|
||||
// probes is 2x 100us, meaning 4 times per ms we will get least one call to
|
||||
// SendPacket().
|
||||
DataSize data_sent = DataSize::Zero();
|
||||
EXPECT_CALL(
|
||||
packet_router,
|
||||
SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
|
||||
kProbeClusterId)))
|
||||
.Times(AtLeast(4))
|
||||
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
|
||||
const PacedPacketInfo&) {
|
||||
data_sent +=
|
||||
DataSize::Bytes(packet->payload_size() + packet->padding_size());
|
||||
});
|
||||
|
||||
// Add one packet to kickstart probing, the rest will be padding packets.
|
||||
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
|
||||
time_controller.AdvanceTime(kMinProbeDelta);
|
||||
|
||||
// Verify the amount of probing data sent.
|
||||
EXPECT_EQ(data_sent, kProbingRate * TimeDelta::Millis(1));
|
||||
}
|
||||
} // namespace test
|
||||
} // namespace webrtc
|
||||
|
||||
Reference in New Issue
Block a user