Fix pacer to accept duplicate sequence numbers on different SSRCs.

BUG=3550
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/17889004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@6610 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pbos@webrtc.org
2014-07-07 10:20:35 +00:00
parent b941fe8098
commit 03c817e405
3 changed files with 107 additions and 65 deletions

View File

@ -16,6 +16,7 @@
#include "webrtc/modules/interface/module.h"
#include "webrtc/system_wrappers/interface/scoped_ptr.h"
#include "webrtc/system_wrappers/interface/thread_annotations.h"
#include "webrtc/system_wrappers/interface/tick_util.h"
#include "webrtc/typedefs.h"
@ -113,41 +114,50 @@ class PacedSender : public Module {
private:
// Return true if next packet in line should be transmitted.
// Return packet list that contains the next packet.
bool ShouldSendNextPacket(paced_sender::PacketList** packet_list);
bool ShouldSendNextPacket(paced_sender::PacketList** packet_list)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Local helper function to GetNextPacket.
paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets);
paced_sender::Packet GetNextPacketFromList(paced_sender::PacketList* packets)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacketFromList(paced_sender::PacketList* packet_list);
bool SendPacketFromList(paced_sender::PacketList* packet_list)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBytesPerInterval(uint32_t delta_time_in_ms);
void UpdateBytesPerInterval(uint32_t delta_time_in_ms)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
// Updates the buffers with the number of bytes that we sent.
void UpdateMediaBytesSent(int num_bytes);
void UpdateMediaBytesSent(int num_bytes) EXCLUSIVE_LOCKS_REQUIRED(critsect_);
Clock* const clock_;
Callback* const callback_;
Clock* clock_;
Callback* callback_;
bool enabled_;
bool paused_;
int max_queue_length_ms_;
scoped_ptr<CriticalSectionWrapper> critsect_;
bool enabled_ GUARDED_BY(critsect_);
bool paused_ GUARDED_BY(critsect_);
int max_queue_length_ms_ GUARDED_BY(critsect_);
// This is the media budget, keeping track of how many bits of media
// we can pace out during the current interval.
scoped_ptr<paced_sender::IntervalBudget> media_budget_;
scoped_ptr<paced_sender::IntervalBudget> media_budget_ GUARDED_BY(critsect_);
// This is the padding budget, keeping track of how many bits of padding we're
// allowed to send out during the current interval. This budget will be
// utilized when there's no media to send.
scoped_ptr<paced_sender::IntervalBudget> padding_budget_;
scoped_ptr<paced_sender::IntervalBudget> padding_budget_
GUARDED_BY(critsect_);
TickTime time_last_update_;
TickTime time_last_send_;
int64_t capture_time_ms_last_queued_;
int64_t capture_time_ms_last_sent_;
TickTime time_last_update_ GUARDED_BY(critsect_);
TickTime time_last_send_ GUARDED_BY(critsect_);
int64_t capture_time_ms_last_queued_ GUARDED_BY(critsect_);
int64_t capture_time_ms_last_sent_ GUARDED_BY(critsect_);
scoped_ptr<paced_sender::PacketList> high_priority_packets_;
scoped_ptr<paced_sender::PacketList> normal_priority_packets_;
scoped_ptr<paced_sender::PacketList> low_priority_packets_;
scoped_ptr<paced_sender::PacketList> high_priority_packets_
GUARDED_BY(critsect_);
scoped_ptr<paced_sender::PacketList> normal_priority_packets_
GUARDED_BY(critsect_);
scoped_ptr<paced_sender::PacketList> low_priority_packets_
GUARDED_BY(critsect_);
};
} // namespace webrtc
#endif // WEBRTC_MODULES_PACED_SENDER_H_

View File

@ -12,6 +12,9 @@
#include <assert.h>
#include <map>
#include <set>
#include "webrtc/modules/interface/module_common_types.h"
#include "webrtc/system_wrappers/interface/clock.h"
#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
@ -36,21 +39,24 @@ namespace webrtc {
namespace paced_sender {
struct Packet {
Packet(uint32_t ssrc, uint16_t seq_number, int64_t capture_time_ms,
int64_t enqueue_time_ms, int length_in_bytes, bool retransmission)
: ssrc_(ssrc),
sequence_number_(seq_number),
capture_time_ms_(capture_time_ms),
enqueue_time_ms_(enqueue_time_ms),
bytes_(length_in_bytes),
retransmission_(retransmission) {
}
uint32_t ssrc_;
uint16_t sequence_number_;
int64_t capture_time_ms_;
int64_t enqueue_time_ms_;
int bytes_;
bool retransmission_;
Packet(uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
int length_in_bytes,
bool retransmission)
: ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
bytes(length_in_bytes),
retransmission(retransmission) {}
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms;
int64_t enqueue_time_ms;
int bytes;
bool retransmission;
};
// STL list style class which prevents duplicates in the list.
@ -68,23 +74,24 @@ class PacketList {
void pop_front() {
Packet& packet = packet_list_.front();
uint16_t sequence_number = packet.sequence_number_;
uint16_t sequence_number = packet.sequence_number;
uint32_t ssrc = packet.ssrc;
packet_list_.pop_front();
sequence_number_set_.erase(sequence_number);
sequence_number_set_[ssrc].erase(sequence_number);
}
void push_back(const Packet& packet) {
if (sequence_number_set_.find(packet.sequence_number_) ==
sequence_number_set_.end()) {
if (sequence_number_set_[packet.ssrc].find(packet.sequence_number) ==
sequence_number_set_[packet.ssrc].end()) {
// Don't insert duplicates.
packet_list_.push_back(packet);
sequence_number_set_.insert(packet.sequence_number_);
sequence_number_set_[packet.ssrc].insert(packet.sequence_number);
}
}
private:
std::list<Packet> packet_list_;
std::set<uint16_t> sequence_number_set_;
std::map<uint32_t, std::set<uint16_t> > sequence_number_set_;
};
class IntervalBudget {
@ -129,10 +136,10 @@ PacedSender::PacedSender(Clock* clock,
int min_bitrate_kbps)
: clock_(clock),
callback_(callback),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
enabled_(true),
paused_(false),
max_queue_length_ms_(kDefaultMaxQueueLengthMs),
critsect_(CriticalSectionWrapper::CreateCriticalSection()),
media_budget_(new paced_sender::IntervalBudget(max_bitrate_kbps)),
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
time_last_update_(TickTime::Now()),
@ -222,19 +229,19 @@ int PacedSender::QueueInMs() const {
int64_t now_ms = clock_->TimeInMilliseconds();
int64_t oldest_packet_enqueue_time = now_ms;
if (!high_priority_packets_->empty()) {
oldest_packet_enqueue_time = std::min(
oldest_packet_enqueue_time,
high_priority_packets_->front().enqueue_time_ms_);
oldest_packet_enqueue_time =
std::min(oldest_packet_enqueue_time,
high_priority_packets_->front().enqueue_time_ms);
}
if (!normal_priority_packets_->empty()) {
oldest_packet_enqueue_time = std::min(
oldest_packet_enqueue_time,
normal_priority_packets_->front().enqueue_time_ms_);
oldest_packet_enqueue_time =
std::min(oldest_packet_enqueue_time,
normal_priority_packets_->front().enqueue_time_ms);
}
if (!low_priority_packets_->empty()) {
oldest_packet_enqueue_time = std::min(
oldest_packet_enqueue_time,
low_priority_packets_->front().enqueue_time_ms_);
oldest_packet_enqueue_time =
std::min(oldest_packet_enqueue_time,
low_priority_packets_->front().enqueue_time_ms);
}
return now_ms - oldest_packet_enqueue_time;
}
@ -291,10 +298,10 @@ bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
paced_sender::Packet packet = GetNextPacketFromList(packet_list);
critsect_->Leave();
const bool success = callback_->TimeToSendPacket(packet.ssrc_,
packet.sequence_number_,
packet.capture_time_ms_,
packet.retransmission_);
const bool success = callback_->TimeToSendPacket(packet.ssrc,
packet.sequence_number,
packet.capture_time_ms,
packet.retransmission);
critsect_->Enter();
// If packet cannot be sent then keep it in packet list and exit early.
// There's no need to send more packets.
@ -302,15 +309,15 @@ bool PacedSender::SendPacketFromList(paced_sender::PacketList* packet_list)
return false;
}
packet_list->pop_front();
const bool last_packet = packet_list->empty() ||
packet_list->front().capture_time_ms_ > packet.capture_time_ms_;
const bool last_packet =
packet_list->empty() ||
packet_list->front().capture_time_ms > packet.capture_time_ms;
if (packet_list != high_priority_packets_.get()) {
if (packet.capture_time_ms_ > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = packet.capture_time_ms_;
} else if (packet.capture_time_ms_ == capture_time_ms_last_sent_ &&
if (packet.capture_time_ms > capture_time_ms_last_sent_) {
capture_time_ms_last_sent_ = packet.capture_time_ms;
} else if (packet.capture_time_ms == capture_time_ms_last_sent_ &&
last_packet) {
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend",
packet.capture_time_ms_);
TRACE_EVENT_ASYNC_END0("webrtc_rtp", "PacedSend", packet.capture_time_ms);
}
}
return true;
@ -344,12 +351,13 @@ bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
int64_t high_priority_capture_time = -1;
if (!high_priority_packets_->empty()) {
high_priority_capture_time =
high_priority_packets_->front().capture_time_ms_;
high_priority_packets_->front().capture_time_ms;
*packet_list = high_priority_packets_.get();
}
if (!normal_priority_packets_->empty() &&
(high_priority_capture_time == -1 || high_priority_capture_time >
normal_priority_packets_->front().capture_time_ms_)) {
(high_priority_capture_time == -1 ||
high_priority_capture_time >
normal_priority_packets_->front().capture_time_ms)) {
*packet_list = normal_priority_packets_.get();
}
if (*packet_list)
@ -375,7 +383,7 @@ bool PacedSender::ShouldSendNextPacket(paced_sender::PacketList** packet_list) {
paced_sender::Packet PacedSender::GetNextPacketFromList(
paced_sender::PacketList* packets) {
paced_sender::Packet packet = packets->front();
UpdateMediaBytesSent(packet.bytes_);
UpdateMediaBytesSent(packet.bytes);
return packet;
}

View File

@ -213,6 +213,30 @@ TEST_F(PacedSenderTest, PaceQueuedPacketsWithDuplicates) {
send_bucket_->Process();
}
TEST_F(PacedSenderTest, CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;
SendAndExpectPacket(PacedSender::kNormalPriority,
ssrc,
sequence_number,
clock_.TimeInMilliseconds(),
250,
false);
// Expect packet on second ssrc to be queued and sent as well.
SendAndExpectPacket(PacedSender::kNormalPriority,
ssrc + 1,
sequence_number,
clock_.TimeInMilliseconds(),
250,
false);
clock_.AdvanceTimeMilliseconds(1000);
TickTime::AdvanceFakeClock(1000);
send_bucket_->Process();
}
TEST_F(PacedSenderTest, Padding) {
uint32_t ssrc = 12345;
uint16_t sequence_number = 1234;