Add callbacks for receive channel RTP statistics

This allows a listener to receive new statistics (byte/packet counts, etc) as it
is generated - avoiding the need to poll. This also makes handling stats from
multiple RTP streams more tractable. The change is primarily targeted at the new
video engine API.

TEST=Unit test in ReceiveStatisticsTest.
Integration tests to follow as call tests when fully wired up.

BUG=2235
R=mflodman@webrtc.org, pbos@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/6259004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@5416 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
sprang@webrtc.org
2014-01-23 10:00:39 +00:00
parent 91db93d24f
commit 0e93257cee
9 changed files with 300 additions and 110 deletions

View File

@ -26,7 +26,8 @@ StreamStatistician::~StreamStatistician() {}
StreamStatisticianImpl::StreamStatisticianImpl(
Clock* clock,
RtcpStatisticsCallback* rtcp_callback)
RtcpStatisticsCallback* rtcp_callback,
StreamDataCountersCallback* rtp_callback)
: clock_(clock),
stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
incoming_bitrate_(clock, NULL),
@ -43,33 +44,26 @@ StreamStatisticianImpl::StreamStatisticianImpl(
received_seq_first_(0),
received_seq_max_(0),
received_seq_wraps_(0),
first_packet_(true),
received_packet_overhead_(12),
received_byte_count_(0),
received_retransmitted_packets_(0),
received_inorder_packet_count_(0),
last_report_inorder_packets_(0),
last_report_old_packets_(0),
last_report_seq_max_(0),
last_reported_statistics_(),
rtcp_callback_(rtcp_callback) {}
rtcp_callback_(rtcp_callback),
rtp_callback_(rtp_callback) {}
void StreamStatisticianImpl::ResetStatistics() {
CriticalSectionScoped cs(stream_lock_.get());
last_report_inorder_packets_ = 0;
last_report_old_packets_ = 0;
last_report_seq_max_ = 0;
memset(&last_reported_statistics_, 0, sizeof(last_reported_statistics_));
last_reported_statistics_ = RtcpStatistics();
jitter_q4_ = 0;
cumulative_loss_ = 0;
jitter_q4_transmission_time_offset_ = 0;
received_seq_wraps_ = 0;
received_seq_max_ = 0;
received_seq_first_ = 0;
received_byte_count_ = 0;
received_retransmitted_packets_ = 0;
received_inorder_packet_count_ = 0;
first_packet_ = true;
receive_counters_ = StreamDataCounters();
}
void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
@ -79,17 +73,17 @@ void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
bool in_order = InOrderPacketInternal(header.sequenceNumber);
ssrc_ = header.ssrc;
incoming_bitrate_.Update(bytes);
received_byte_count_ += bytes;
receive_counters_.bytes +=
bytes - (header.paddingLength + header.headerLength);
receive_counters_.header_bytes += header.headerLength;
receive_counters_.padding_bytes += header.paddingLength;
++receive_counters_.packets;
if (!in_order && retransmitted) {
++receive_counters_.retransmitted_packets;
}
if (first_packet_) {
first_packet_ = false;
// This is the first received report.
if (receive_counters_.packets == 1) {
received_seq_first_ = header.sequenceNumber;
received_seq_max_ = header.sequenceNumber;
received_inorder_packet_count_ = 1;
clock_->CurrentNtp(last_receive_time_secs_, last_receive_time_frac_);
last_receive_time_ms_ = clock_->TimeInMilliseconds();
return;
}
// Count only the new packets received. That is, if packets 1, 2, 3, 5, 4, 6
@ -99,66 +93,27 @@ void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
uint32_t receive_time_secs;
uint32_t receive_time_frac;
clock_->CurrentNtp(receive_time_secs, receive_time_frac);
received_inorder_packet_count_++;
// Wrong if we use RetransmitOfOldPacket.
int32_t seq_diff = header.sequenceNumber - received_seq_max_;
if (seq_diff < 0) {
if (receive_counters_.packets > 1 &&
received_seq_max_ > header.sequenceNumber) {
// Wrap around detected.
received_seq_wraps_++;
}
// New max.
received_seq_max_ = header.sequenceNumber;
// If new time stamp and more than one in-order packet received, calculate
// new jitter statistics.
if (header.timestamp != last_received_timestamp_ &&
received_inorder_packet_count_ > 1) {
uint32_t receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
receive_time_secs, receive_time_frac, header.payload_type_frequency);
uint32_t last_receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
last_receive_time_secs_, last_receive_time_frac_,
header.payload_type_frequency);
int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
(header.timestamp - last_received_timestamp_);
time_diff_samples = abs(time_diff_samples);
// lib_jingle sometimes deliver crazy jumps in TS for the same stream.
// If this happens, don't update jitter value. Use 5 secs video frequency
// as the threshold.
if (time_diff_samples < 450000) {
// Note we calculate in Q4 to avoid using float.
int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
}
// Extended jitter report, RFC 5450.
// Actual network jitter, excluding the source-introduced jitter.
int32_t time_diff_samples_ext =
(receive_time_rtp - last_receive_time_rtp) -
((header.timestamp +
header.extension.transmissionTimeOffset) -
(last_received_timestamp_ +
last_received_transmission_time_offset_));
time_diff_samples_ext = abs(time_diff_samples_ext);
if (time_diff_samples_ext < 450000) {
int32_t jitter_diffQ4TransmissionTimeOffset =
(time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
jitter_q4_transmission_time_offset_ +=
((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
}
(receive_counters_.packets - receive_counters_.retransmitted_packets) >
1) {
UpdateJitter(header, receive_time_secs, receive_time_frac);
}
last_received_timestamp_ = header.timestamp;
last_receive_time_secs_ = receive_time_secs;
last_receive_time_frac_ = receive_time_frac;
last_receive_time_ms_ = clock_->TimeInMilliseconds();
} else {
if (retransmitted) {
received_retransmitted_packets_++;
} else {
received_inorder_packet_count_++;
}
}
uint16_t packet_oh = header.headerLength + header.paddingLength;
@ -166,6 +121,55 @@ void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
// Our measured overhead. Filter from RFC 5104 4.2.1.2:
// avg_OH (new) = 15/16*avg_OH (old) + 1/16*pckt_OH,
received_packet_overhead_ = (15 * received_packet_overhead_ + packet_oh) >> 4;
rtp_callback_->DataCountersUpdated(receive_counters_, ssrc_);
}
void StreamStatisticianImpl::UpdateJitter(const RTPHeader& header,
uint32_t receive_time_secs,
uint32_t receive_time_frac) {
uint32_t receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
receive_time_secs, receive_time_frac, header.payload_type_frequency);
uint32_t last_receive_time_rtp = ModuleRTPUtility::ConvertNTPTimeToRTP(
last_receive_time_secs_, last_receive_time_frac_,
header.payload_type_frequency);
int32_t time_diff_samples = (receive_time_rtp - last_receive_time_rtp) -
(header.timestamp - last_received_timestamp_);
time_diff_samples = abs(time_diff_samples);
// lib_jingle sometimes deliver crazy jumps in TS for the same stream.
// If this happens, don't update jitter value. Use 5 secs video frequency
// as the threshold.
if (time_diff_samples < 450000) {
// Note we calculate in Q4 to avoid using float.
int32_t jitter_diff_q4 = (time_diff_samples << 4) - jitter_q4_;
jitter_q4_ += ((jitter_diff_q4 + 8) >> 4);
}
// Extended jitter report, RFC 5450.
// Actual network jitter, excluding the source-introduced jitter.
int32_t time_diff_samples_ext =
(receive_time_rtp - last_receive_time_rtp) -
((header.timestamp +
header.extension.transmissionTimeOffset) -
(last_received_timestamp_ +
last_received_transmission_time_offset_));
time_diff_samples_ext = abs(time_diff_samples_ext);
if (time_diff_samples_ext < 450000) {
int32_t jitter_diffQ4TransmissionTimeOffset =
(time_diff_samples_ext << 4) - jitter_q4_transmission_time_offset_;
jitter_q4_transmission_time_offset_ +=
((jitter_diffQ4TransmissionTimeOffset + 8) >> 4);
}
}
void StreamStatisticianImpl::FecPacketReceived() {
CriticalSectionScoped cs(stream_lock_.get());
++receive_counters_.fec_packets;
rtp_callback_->DataCountersUpdated(receive_counters_, ssrc_);
}
void StreamStatisticianImpl::SetMaxReorderingThreshold(
@ -178,7 +182,7 @@ bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
bool reset) {
{
CriticalSectionScoped cs(stream_lock_.get());
if (received_seq_first_ == 0 && received_byte_count_ == 0) {
if (received_seq_first_ == 0 && receive_counters_.bytes == 0) {
// We have not received anything.
return false;
}
@ -197,6 +201,8 @@ bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
}
rtcp_callback_->StatisticsUpdated(*statistics, ssrc_);
rtp_callback_->DataCountersUpdated(receive_counters_, ssrc_);
return true;
}
@ -219,7 +225,8 @@ RtcpStatistics StreamStatisticianImpl::CalculateStatistics() {
// Number of received RTP packets since last report, counts all packets but
// not re-transmissions.
uint32_t rec_since_last =
received_inorder_packet_count_ - last_report_inorder_packets_;
(receive_counters_.packets - receive_counters_.retransmitted_packets) -
last_report_inorder_packets_;
// With NACK we don't know the expected retransmissions during the last
// second. We know how many "old" packets we have received. We just count
@ -231,7 +238,7 @@ RtcpStatistics StreamStatisticianImpl::CalculateStatistics() {
// re-transmitted. We use RTT to decide if a packet is re-ordered or
// re-transmitted.
uint32_t retransmitted_packets =
received_retransmitted_packets_ - last_report_old_packets_;
receive_counters_.retransmitted_packets - last_report_old_packets_;
rec_since_last += retransmitted_packets;
int32_t missing = 0;
@ -258,8 +265,9 @@ RtcpStatistics StreamStatisticianImpl::CalculateStatistics() {
last_reported_statistics_ = stats;
// Only for report blocks in RTCP SR and RR.
last_report_inorder_packets_ = received_inorder_packet_count_;
last_report_old_packets_ = received_retransmitted_packets_;
last_report_inorder_packets_ =
receive_counters_.packets - receive_counters_.retransmitted_packets;
last_report_old_packets_ = receive_counters_.retransmitted_packets;
last_report_seq_max_ = received_seq_max_;
return stats;
@ -269,11 +277,11 @@ void StreamStatisticianImpl::GetDataCounters(
uint32_t* bytes_received, uint32_t* packets_received) const {
CriticalSectionScoped cs(stream_lock_.get());
if (bytes_received) {
*bytes_received = received_byte_count_;
*bytes_received = receive_counters_.bytes + receive_counters_.header_bytes +
receive_counters_.padding_bytes;
}
if (packets_received) {
*packets_received =
received_retransmitted_packets_ + received_inorder_packet_count_;
*packets_received = receive_counters_.packets;
}
}
@ -358,7 +366,8 @@ ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
: clock_(clock),
receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
last_rate_update_ms_(0),
rtcp_stats_callback_(NULL) {}
rtcp_stats_callback_(NULL),
rtp_stats_callback_(NULL) {}
ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
while (!statisticians_.empty()) {
@ -368,7 +377,8 @@ ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
}
void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
size_t bytes, bool old_packet) {
size_t bytes,
bool retransmitted) {
StatisticianImplMap::iterator it;
{
CriticalSectionScoped cs(receive_statistics_lock_.get());
@ -376,11 +386,18 @@ void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
if (it == statisticians_.end()) {
std::pair<StatisticianImplMap::iterator, uint32_t> insert_result =
statisticians_.insert(std::make_pair(
header.ssrc, new StreamStatisticianImpl(clock_, this)));
header.ssrc, new StreamStatisticianImpl(clock_, this, this)));
it = insert_result.first;
}
}
it->second->IncomingPacket(header, bytes, old_packet);
it->second->IncomingPacket(header, bytes, retransmitted);
}
void ReceiveStatisticsImpl::FecPacketReceived(uint32_t ssrc) {
CriticalSectionScoped cs(receive_statistics_lock_.get());
StatisticianImplMap::iterator it = statisticians_.find(ssrc);
assert(it != statisticians_.end());
it->second->FecPacketReceived();
}
void ReceiveStatisticsImpl::ChangeSsrc(uint32_t from_ssrc, uint32_t to_ssrc) {
@ -461,10 +478,28 @@ void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
}
}
void ReceiveStatisticsImpl::RegisterRtpStatisticsCallback(
StreamDataCountersCallback* callback) {
CriticalSectionScoped cs(receive_statistics_lock_.get());
if (callback != NULL)
assert(rtp_stats_callback_ == NULL);
rtp_stats_callback_ = callback;
}
void ReceiveStatisticsImpl::DataCountersUpdated(const StreamDataCounters& stats,
uint32_t ssrc) {
CriticalSectionScoped cs(receive_statistics_lock_.get());
if (rtp_stats_callback_) {
rtp_stats_callback_->DataCountersUpdated(stats, ssrc);
}
}
void NullReceiveStatistics::IncomingPacket(const RTPHeader& rtp_header,
size_t bytes,
bool retransmitted) {}
void NullReceiveStatistics::FecPacketReceived(uint32_t ssrc) {}
StatisticianMap NullReceiveStatistics::GetActiveStatisticians() const {
return StatisticianMap();
}
@ -484,4 +519,7 @@ int32_t NullReceiveStatistics::Process() { return 0; }
void NullReceiveStatistics::RegisterRtcpStatisticsCallback(
RtcpStatisticsCallback* callback) {}
void NullReceiveStatistics::RegisterRtpStatisticsCallback(
StreamDataCountersCallback* callback) {}
} // namespace webrtc