Add Incoming packet handler to RtcpTransceiver

Bug: webrtc:8239
Change-Id: Iebe1b6a2649f01e4f6e3780ac96cb05611d8671c
Reviewed-on: https://webrtc-review.googlesource.com/17560
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20624}
This commit is contained in:
Danil Chapovalov
2017-11-09 15:42:28 +01:00
committed by Commit Bot
parent 7501b1c3d1
commit d2f37d85bd
4 changed files with 184 additions and 14 deletions

View File

@ -234,6 +234,7 @@ rtc_source_set("rtcp_transceiver") {
"../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_task_queue", "../../rtc_base:rtc_task_queue",
"../../rtc_base:weak_ptr", "../../rtc_base:weak_ptr",
"../../system_wrappers:system_wrappers",
] ]
} }
@ -398,6 +399,7 @@ if (rtc_include_tests) {
"../../common_video:common_video", "../../common_video:common_video",
"../../logging:rtc_event_log_api", "../../logging:rtc_event_log_api",
"../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_base_tests_utils",
"../../rtc_base:rtc_task_queue", "../../rtc_base:rtc_task_queue",
"../../system_wrappers:system_wrappers", "../../system_wrappers:system_wrappers",
"../../test:field_trial", "../../test:field_trial",

View File

@ -11,15 +11,17 @@
#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
#include <utility> #include <utility>
#include <vector>
#include "api/call/transport.h" #include "api/call/transport.h"
#include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtcp_packet.h" #include "modules/rtp_rtcp/source/rtcp_packet.h"
#include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
#include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h" #include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
#include "modules/rtp_rtcp/source/rtcp_packet/report_block.h" #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sdes.h" #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
#include "modules/rtp_rtcp/source/time_util.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/ptr_util.h" #include "rtc_base/ptr_util.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
@ -76,12 +78,41 @@ RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
RtcpTransceiverImpl::~RtcpTransceiverImpl() = default; RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet) {
while (!packet.empty()) {
rtcp::CommonHeader rtcp_block;
if (!rtcp_block.Parse(packet.data(), packet.size()))
return;
HandleReceivedPacket(rtcp_block);
// TODO(danilchap): Use packet.remove_prefix() when that function exists.
packet = packet.subview(rtcp_block.packet_size());
}
}
void RtcpTransceiverImpl::SendCompoundPacket() { void RtcpTransceiverImpl::SendCompoundPacket() {
SendPacket(); SendPacket();
if (config_.schedule_periodic_compound_packets) if (config_.schedule_periodic_compound_packets)
ReschedulePeriodicCompoundPackets(config_.report_period_ms); ReschedulePeriodicCompoundPackets(config_.report_period_ms);
} }
void RtcpTransceiverImpl::HandleReceivedPacket(
const rtcp::CommonHeader& rtcp_packet_header) {
switch (rtcp_packet_header.type()) {
case rtcp::SenderReport::kPacketType: {
rtcp::SenderReport sender_report;
if (!sender_report.Parse(rtcp_packet_header))
return;
SenderReportTimes& last =
last_received_sender_reports_[sender_report.sender_ssrc()];
last.local_received_time_us = rtc::TimeMicros();
last.remote_sent_time = sender_report.ntp();
break;
}
}
}
void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) { void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) {
class SendPeriodicCompoundPacket : public rtc::QueuedTask { class SendPeriodicCompoundPacket : public rtc::QueuedTask {
public: public:
@ -119,19 +150,11 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) {
void RtcpTransceiverImpl::SendPacket() { void RtcpTransceiverImpl::SendPacket() {
PacketSender sender(config_.outgoing_transport, config_.max_packet_size); PacketSender sender(config_.outgoing_transport, config_.max_packet_size);
rtcp::ReceiverReport rr; rtcp::ReceiverReport receiver_report;
rr.SetSenderSsrc(config_.feedback_ssrc); receiver_report.SetSenderSsrc(config_.feedback_ssrc);
if (config_.receive_statistics) { receiver_report.SetReportBlocks(CreateReportBlocks());
// TODO(danilchap): Support sending more than sender.AppendPacket(receiver_report);
// |ReceiverReport::kMaxNumberOfReportBlocks| per compound rtcp packet.
std::vector<rtcp::ReportBlock> report_blocks =
config_.receive_statistics->RtcpReportBlocks(
rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
// TODO(danilchap): Fill in LastSr/DelayLastSr fields of report blocks
// when RtcpTransceiver handles incoming sender reports.
rr.SetReportBlocks(std::move(report_blocks));
}
sender.AppendPacket(rr);
if (!config_.cname.empty()) { if (!config_.cname.empty()) {
rtcp::Sdes sdes; rtcp::Sdes sdes;
bool added = sdes.AddCName(config_.feedback_ssrc, config_.cname); bool added = sdes.AddCName(config_.feedback_ssrc, config_.cname);
@ -143,4 +166,24 @@ void RtcpTransceiverImpl::SendPacket() {
sender.Send(); sender.Send();
} }
std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks() {
if (!config_.receive_statistics)
return {};
// TODO(danilchap): Support sending more than
// |ReceiverReport::kMaxNumberOfReportBlocks| per compound rtcp packet.
std::vector<rtcp::ReportBlock> report_blocks =
config_.receive_statistics->RtcpReportBlocks(
rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
for (rtcp::ReportBlock& report_block : report_blocks) {
auto it = last_received_sender_reports_.find(report_block.source_ssrc());
if (it == last_received_sender_reports_.end())
continue;
const SenderReportTimes& last_sender_report = it->second;
report_block.SetLastSr(CompactNtp(last_sender_report.remote_sent_time));
report_block.SetDelayLastSr(SaturatedUsToCompactNtp(
rtc::TimeMicros() - last_sender_report.local_received_time_us));
}
return report_blocks;
}
} // namespace webrtc } // namespace webrtc

View File

@ -11,13 +11,18 @@
#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_ #ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_
#define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_ #define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_IMPL_H_
#include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include "api/array_view.h" #include "api/array_view.h"
#include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
#include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
#include "modules/rtp_rtcp/source/rtcp_transceiver_config.h" #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h"
#include "rtc_base/constructormagic.h" #include "rtc_base/constructormagic.h"
#include "rtc_base/weak_ptr.h" #include "rtc_base/weak_ptr.h"
#include "system_wrappers/include/ntp_time.h"
namespace webrtc { namespace webrtc {
// //
@ -29,16 +34,29 @@ class RtcpTransceiverImpl {
explicit RtcpTransceiverImpl(const RtcpTransceiverConfig& config); explicit RtcpTransceiverImpl(const RtcpTransceiverConfig& config);
~RtcpTransceiverImpl(); ~RtcpTransceiverImpl();
// Handles incoming rtcp packets.
void ReceivePacket(rtc::ArrayView<const uint8_t> packet);
// Sends RTCP packets starting with a sender or receiver report. // Sends RTCP packets starting with a sender or receiver report.
void SendCompoundPacket(); void SendCompoundPacket();
private: private:
struct SenderReportTimes {
int64_t local_received_time_us;
NtpTime remote_sent_time;
};
void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header);
void ReschedulePeriodicCompoundPackets(int64_t delay_ms); void ReschedulePeriodicCompoundPackets(int64_t delay_ms);
// Sends RTCP packets. // Sends RTCP packets.
void SendPacket(); void SendPacket();
// Generate Report Blocks to be send in Sender or Receiver Report.
std::vector<rtcp::ReportBlock> CreateReportBlocks();
const RtcpTransceiverConfig config_; const RtcpTransceiverConfig config_;
std::map<uint32_t, SenderReportTimes> last_received_sender_reports_;
rtc::WeakPtrFactory<RtcpTransceiverImpl> ptr_factory_; rtc::WeakPtrFactory<RtcpTransceiverImpl> ptr_factory_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiverImpl); RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiverImpl);

View File

@ -13,7 +13,9 @@
#include <vector> #include <vector>
#include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/source/time_util.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
#include "rtc_base/fakeclock.h"
#include "rtc_base/ptr_util.h" #include "rtc_base/ptr_util.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "test/gmock.h" #include "test/gmock.h"
@ -27,10 +29,14 @@ using ::testing::_;
using ::testing::Invoke; using ::testing::Invoke;
using ::testing::Return; using ::testing::Return;
using ::testing::SizeIs; using ::testing::SizeIs;
using ::webrtc::CompactNtp;
using ::webrtc::CompactNtpRttToMs;
using ::webrtc::MockTransport; using ::webrtc::MockTransport;
using ::webrtc::NtpTime;
using ::webrtc::RtcpTransceiverConfig; using ::webrtc::RtcpTransceiverConfig;
using ::webrtc::RtcpTransceiverImpl; using ::webrtc::RtcpTransceiverImpl;
using ::webrtc::rtcp::ReportBlock; using ::webrtc::rtcp::ReportBlock;
using ::webrtc::rtcp::SenderReport;
using ::webrtc::test::RtcpPacketParser; using ::webrtc::test::RtcpPacketParser;
class MockReceiveStatisticsProvider : public webrtc::ReceiveStatisticsProvider { class MockReceiveStatisticsProvider : public webrtc::ReceiveStatisticsProvider {
@ -233,4 +239,105 @@ TEST(RtcpTransceiverImplTest, ReceiverReportUsesReceiveStatistics) {
kMediaSsrc); kMediaSsrc);
} }
// TODO(danilchap): Write test ReceivePacket handles several rtcp_packets
// stacked together when callbacks will be implemented that can be used for
// cleaner expectations.
TEST(RtcpTransceiverImplTest,
WhenSendsReceiverReportSetsLastSenderReportTimestampPerRemoteSsrc) {
const uint32_t kRemoteSsrc1 = 4321;
const uint32_t kRemoteSsrc2 = 5321;
MockTransport outgoing_transport;
std::vector<ReportBlock> statistics_report_blocks(2);
statistics_report_blocks[0].SetMediaSsrc(kRemoteSsrc1);
statistics_report_blocks[1].SetMediaSsrc(kRemoteSsrc2);
MockReceiveStatisticsProvider receive_statistics;
EXPECT_CALL(receive_statistics, RtcpReportBlocks(_))
.WillOnce(Return(statistics_report_blocks));
RtcpTransceiverConfig config;
config.schedule_periodic_compound_packets = false;
config.outgoing_transport = &outgoing_transport;
config.receive_statistics = &receive_statistics;
RtcpTransceiverImpl rtcp_transceiver(config);
const NtpTime kRemoteNtp(0x9876543211);
// Receive SenderReport for RemoteSsrc2, but no report for RemoteSsrc1.
SenderReport sr;
sr.SetSenderSsrc(kRemoteSsrc2);
sr.SetNtp(kRemoteNtp);
auto raw_packet = sr.Build();
rtcp_transceiver.ReceivePacket(raw_packet);
// Trigger sending ReceiverReport.
RtcpPacketParser rtcp_parser;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
.WillOnce(Invoke(&rtcp_parser, &RtcpPacketParser::Parse));
rtcp_transceiver.SendCompoundPacket();
EXPECT_GT(rtcp_parser.receiver_report()->num_packets(), 0);
const auto& report_blocks = rtcp_parser.receiver_report()->report_blocks();
ASSERT_EQ(report_blocks.size(), 2u);
// RtcpTransceiverImpl doesn't guarantee order of the report blocks
// match result of ReceiveStatisticsProvider::RtcpReportBlocks callback,
// but for simplicity of the test asume it is the same.
ASSERT_EQ(report_blocks[0].source_ssrc(), kRemoteSsrc1);
// No matching Sender Report for kRemoteSsrc1, LastSR fields has to be 0.
EXPECT_EQ(report_blocks[0].last_sr(), 0u);
ASSERT_EQ(report_blocks[1].source_ssrc(), kRemoteSsrc2);
EXPECT_EQ(report_blocks[1].last_sr(), CompactNtp(kRemoteNtp));
}
TEST(RtcpTransceiverImplTest,
WhenSendsReceiverReportCalculatesDelaySinceLastSenderReport) {
const uint32_t kRemoteSsrc1 = 4321;
const uint32_t kRemoteSsrc2 = 5321;
rtc::ScopedFakeClock clock;
MockTransport outgoing_transport;
std::vector<ReportBlock> statistics_report_blocks(2);
statistics_report_blocks[0].SetMediaSsrc(kRemoteSsrc1);
statistics_report_blocks[1].SetMediaSsrc(kRemoteSsrc2);
MockReceiveStatisticsProvider receive_statistics;
EXPECT_CALL(receive_statistics, RtcpReportBlocks(_))
.WillOnce(Return(statistics_report_blocks));
RtcpTransceiverConfig config;
config.schedule_periodic_compound_packets = false;
config.outgoing_transport = &outgoing_transport;
config.receive_statistics = &receive_statistics;
RtcpTransceiverImpl rtcp_transceiver(config);
auto receive_sender_report = [&rtcp_transceiver](uint32_t remote_ssrc) {
SenderReport sr;
sr.SetSenderSsrc(remote_ssrc);
auto raw_packet = sr.Build();
rtcp_transceiver.ReceivePacket(raw_packet);
};
receive_sender_report(kRemoteSsrc1);
clock.AdvanceTimeMicros(100 * rtc::kNumMicrosecsPerMillisec);
receive_sender_report(kRemoteSsrc2);
clock.AdvanceTimeMicros(100 * rtc::kNumMicrosecsPerMillisec);
// Trigger ReceiverReport back.
RtcpPacketParser rtcp_parser;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
.WillOnce(Invoke(&rtcp_parser, &RtcpPacketParser::Parse));
rtcp_transceiver.SendCompoundPacket();
EXPECT_GT(rtcp_parser.receiver_report()->num_packets(), 0);
const auto& report_blocks = rtcp_parser.receiver_report()->report_blocks();
ASSERT_EQ(report_blocks.size(), 2u);
// RtcpTransceiverImpl doesn't guarantee order of the report blocks
// match result of ReceiveStatisticsProvider::RtcpReportBlocks callback,
// but for simplicity of the test asume it is the same.
ASSERT_EQ(report_blocks[0].source_ssrc(), kRemoteSsrc1);
EXPECT_EQ(CompactNtpRttToMs(report_blocks[0].delay_since_last_sr()), 200);
ASSERT_EQ(report_blocks[1].source_ssrc(), kRemoteSsrc2);
EXPECT_EQ(CompactNtpRttToMs(report_blocks[1].delay_since_last_sr()), 100);
}
} // namespace } // namespace