Add Thread-safe wrapper for RtcpTransceiver

Bug: webrtc:8239
Change-Id: I4cc2f7f2b27c764e1aae734f933902102b345614
Reviewed-on: https://webrtc-review.googlesource.com/21680
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20714}
This commit is contained in:
Danil Chapovalov
2017-11-16 14:35:32 +01:00
committed by Commit Bot
parent fe4d673393
commit c0fd5f97a8
7 changed files with 313 additions and 16 deletions

View File

@ -221,10 +221,12 @@ rtc_static_library("rtp_rtcp") {
rtc_source_set("rtcp_transceiver") {
public = [
"source/rtcp_transceiver.h",
"source/rtcp_transceiver_config.h",
"source/rtcp_transceiver_impl.h",
]
sources = [
"source/rtcp_transceiver.cc",
"source/rtcp_transceiver_config.cc",
"source/rtcp_transceiver_impl.cc",
]
@ -349,6 +351,7 @@ if (rtc_include_tests) {
"source/rtcp_receiver_unittest.cc",
"source/rtcp_sender_unittest.cc",
"source/rtcp_transceiver_impl_unittest.cc",
"source/rtcp_transceiver_unittest.cc",
"source/rtp_fec_unittest.cc",
"source/rtp_format_h264_unittest.cc",
"source/rtp_format_video_generic_unittest.cc",

View File

@ -0,0 +1,93 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/rtcp_transceiver.h"
#include <utility>
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/ptr_util.h"
#include "rtc_base/timeutils.h"
namespace webrtc {
RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config)
: task_queue_(config.task_queue),
rtcp_transceiver_(rtc::MakeUnique<RtcpTransceiverImpl>(config)),
ptr_factory_(rtcp_transceiver_.get()),
// Creating first weak ptr can be done on any thread, but is not
// thread-safe, thus do it at construction. Creating second (e.g. making a
// copy) is thread-safe.
ptr_(ptr_factory_.GetWeakPtr()) {
RTC_DCHECK(task_queue_);
}
RtcpTransceiver::~RtcpTransceiver() {
if (task_queue_->IsCurrent())
return;
rtc::Event done(false, false);
// TODO(danilchap): Merge cleanup into main closure when task queue does not
// silently drop tasks.
task_queue_->PostTask(rtc::NewClosure(
[this] {
// Destructor steps that has to run on the task_queue_.
ptr_factory_.InvalidateWeakPtrs();
rtcp_transceiver_.reset();
},
/*cleanup=*/[&done] { done.Set(); }));
// Wait until destruction is complete to be sure weak pointers invalidated and
// rtcp_transceiver destroyed on the queue while |this| still valid.
done.Wait(rtc::Event::kForever);
RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp";
}
void RtcpTransceiver::ReceivePacket(rtc::CopyOnWriteBuffer packet) {
rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
int64_t now_us = rtc::TimeMicros();
task_queue_->PostTask([ptr, packet, now_us] {
if (ptr)
ptr->ReceivePacket(packet, now_us);
});
}
void RtcpTransceiver::SendCompoundPacket() {
rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
task_queue_->PostTask([ptr] {
if (ptr)
ptr->SendCompoundPacket();
});
}
void RtcpTransceiver::SetRemb(int bitrate_bps, std::vector<uint32_t> ssrcs) {
// TODO(danilchap): Replace with lambda with move capture when available.
struct SetRembClosure {
void operator()() {
if (ptr)
ptr->SetRemb(bitrate_bps, std::move(ssrcs));
}
rtc::WeakPtr<RtcpTransceiverImpl> ptr;
int bitrate_bps;
std::vector<uint32_t> ssrcs;
};
task_queue_->PostTask(SetRembClosure{ptr_, bitrate_bps, std::move(ssrcs)});
}
void RtcpTransceiver::UnsetRemb() {
rtc::WeakPtr<RtcpTransceiverImpl> ptr = ptr_;
task_queue_->PostTask([ptr] {
if (ptr)
ptr->UnsetRemb();
});
}
} // namespace webrtc

View File

@ -0,0 +1,62 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_
#define MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_
#include <memory>
#include <string>
#include <vector>
#include "modules/rtp_rtcp/source/rtcp_transceiver_config.h"
#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/copyonwritebuffer.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/weak_ptr.h"
namespace webrtc {
//
// Manage incoming and outgoing rtcp messages for multiple BUNDLED streams.
//
// This class is thread-safe wrapper of RtcpTransceiverImpl
class RtcpTransceiver {
public:
explicit RtcpTransceiver(const RtcpTransceiverConfig& config);
~RtcpTransceiver();
// Handles incoming rtcp packets.
void ReceivePacket(rtc::CopyOnWriteBuffer packet);
// Sends RTCP packets starting with a sender or receiver report.
void SendCompoundPacket();
// (REMB) Receiver Estimated Max Bitrate.
// Includes REMB in following compound packets.
void SetRemb(int bitrate_bps, std::vector<uint32_t> ssrcs);
// Stops sending REMB in following compound packets.
void UnsetRemb();
private:
rtc::TaskQueue* const task_queue_;
std::unique_ptr<RtcpTransceiverImpl> rtcp_transceiver_;
rtc::WeakPtrFactory<RtcpTransceiverImpl> ptr_factory_;
// TaskQueue, and thus tasks posted to it, may outlive this.
// Thus when Posting task class always pass copy of the weak_ptr to access
// the RtcpTransceiver and never guarantee it still will be alive when task
// runs.
rtc::WeakPtr<RtcpTransceiverImpl> ptr_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiver);
};
} // namespace webrtc
#endif // MODULES_RTP_RTCP_SOURCE_RTCP_TRANSCEIVER_H_

View File

@ -25,6 +25,7 @@
#include "rtc_base/checks.h"
#include "rtc_base/ptr_util.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/timeutils.h"
namespace webrtc {
namespace {
@ -73,18 +74,19 @@ RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
: config_(config), ptr_factory_(this) {
RTC_CHECK(config_.Validate());
if (config_.schedule_periodic_compound_packets)
ReschedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
SchedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
}
RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet) {
void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
int64_t now_us) {
while (!packet.empty()) {
rtcp::CommonHeader rtcp_block;
if (!rtcp_block.Parse(packet.data(), packet.size()))
return;
HandleReceivedPacket(rtcp_block);
HandleReceivedPacket(rtcp_block, now_us);
// TODO(danilchap): Use packet.remove_prefix() when that function exists.
packet = packet.subview(rtcp_block.packet_size());
@ -93,8 +95,11 @@ void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet) {
void RtcpTransceiverImpl::SendCompoundPacket() {
SendPacket();
if (config_.schedule_periodic_compound_packets)
ReschedulePeriodicCompoundPackets(config_.report_period_ms);
if (config_.schedule_periodic_compound_packets) {
// Stop existent send task.
ptr_factory_.InvalidateWeakPtrs();
SchedulePeriodicCompoundPackets(config_.report_period_ms);
}
}
void RtcpTransceiverImpl::SetRemb(int bitrate_bps,
@ -113,7 +118,8 @@ void RtcpTransceiverImpl::UnsetRemb() {
}
void RtcpTransceiverImpl::HandleReceivedPacket(
const rtcp::CommonHeader& rtcp_packet_header) {
const rtcp::CommonHeader& rtcp_packet_header,
int64_t now_us) {
switch (rtcp_packet_header.type()) {
case rtcp::SenderReport::kPacketType: {
rtcp::SenderReport sender_report;
@ -121,14 +127,14 @@ void RtcpTransceiverImpl::HandleReceivedPacket(
return;
SenderReportTimes& last =
last_received_sender_reports_[sender_report.sender_ssrc()];
last.local_received_time_us = rtc::TimeMicros();
last.local_received_time_us = now_us;
last.remote_sent_time = sender_report.ntp();
break;
}
}
}
void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) {
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
class SendPeriodicCompoundPacket : public rtc::QueuedTask {
public:
SendPeriodicCompoundPacket(rtc::TaskQueue* task_queue,
@ -150,10 +156,7 @@ void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) {
};
RTC_DCHECK(config_.schedule_periodic_compound_packets);
RTC_DCHECK(config_.task_queue->IsCurrent());
// Stop existent send task if there is one.
ptr_factory_.InvalidateWeakPtrs();
auto task = rtc::MakeUnique<SendPeriodicCompoundPacket>(
config_.task_queue, ptr_factory_.GetWeakPtr());
if (delay_ms > 0)

View File

@ -37,7 +37,7 @@ class RtcpTransceiverImpl {
~RtcpTransceiverImpl();
// Handles incoming rtcp packets.
void ReceivePacket(rtc::ArrayView<const uint8_t> packet);
void ReceivePacket(rtc::ArrayView<const uint8_t> packet, int64_t now_us);
// Sends RTCP packets starting with a sender or receiver report.
void SendCompoundPacket();
@ -54,9 +54,10 @@ class RtcpTransceiverImpl {
NtpTime remote_sent_time;
};
void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header);
void HandleReceivedPacket(const rtcp::CommonHeader& rtcp_packet_header,
int64_t now_us);
void ReschedulePeriodicCompoundPackets(int64_t delay_ms);
void SchedulePeriodicCompoundPackets(int64_t delay_ms);
// Sends RTCP packets.
void SendPacket();
// Generate Report Blocks to be send in Sender or Receiver Report.

View File

@ -374,7 +374,7 @@ TEST(RtcpTransceiverImplTest,
sr.SetSenderSsrc(kRemoteSsrc2);
sr.SetNtp(kRemoteNtp);
auto raw_packet = sr.Build();
rtcp_transceiver.ReceivePacket(raw_packet);
rtcp_transceiver.ReceivePacket(raw_packet, /*now_us=*/0);
// Trigger sending ReceiverReport.
RtcpPacketParser rtcp_parser;
@ -419,7 +419,7 @@ TEST(RtcpTransceiverImplTest,
SenderReport sr;
sr.SetSenderSsrc(remote_ssrc);
auto raw_packet = sr.Build();
rtcp_transceiver.ReceivePacket(raw_packet);
rtcp_transceiver.ReceivePacket(raw_packet, rtc::TimeMicros());
};
receive_sender_report(kRemoteSsrc1);

View File

@ -0,0 +1,135 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/rtcp_transceiver.h"
#include "rtc_base/event.h"
#include "rtc_base/ptr_util.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/mock_transport.h"
namespace {
using ::testing::AtLeast;
using ::testing::InvokeWithoutArgs;
using ::testing::NiceMock;
using ::testing::_;
using ::webrtc::MockTransport;
using ::webrtc::RtcpTransceiver;
using ::webrtc::RtcpTransceiverConfig;
void WaitPostedTasks(rtc::TaskQueue* queue) {
rtc::Event done(false, false);
queue->PostTask([&done] { done.Set(); });
ASSERT_TRUE(done.Wait(1000));
}
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
rtc::TaskQueue queue("rtcp");
MockTransport outgoing_transport;
RtcpTransceiverConfig config;
config.outgoing_transport = &outgoing_transport;
config.task_queue = &queue;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
.WillRepeatedly(InvokeWithoutArgs([&] {
EXPECT_TRUE(queue.IsCurrent());
return true;
}));
RtcpTransceiver rtcp_transceiver(config);
rtcp_transceiver.SendCompoundPacket();
WaitPostedTasks(&queue);
}
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
rtc::TaskQueue queue("rtcp");
MockTransport outgoing_transport;
RtcpTransceiverConfig config;
config.outgoing_transport = &outgoing_transport;
config.task_queue = &queue;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
.WillRepeatedly(InvokeWithoutArgs([&] {
EXPECT_TRUE(queue.IsCurrent());
return true;
}));
std::unique_ptr<RtcpTransceiver> rtcp_transceiver;
queue.PostTask([&] {
rtcp_transceiver = rtc::MakeUnique<RtcpTransceiver>(config);
rtcp_transceiver->SendCompoundPacket();
});
WaitPostedTasks(&queue);
}
TEST(RtcpTransceiverTest, CanBeDestoryedOnTaskQueue) {
rtc::TaskQueue queue("rtcp");
NiceMock<MockTransport> outgoing_transport;
RtcpTransceiverConfig config;
config.outgoing_transport = &outgoing_transport;
config.task_queue = &queue;
auto rtcp_transceiver = rtc::MakeUnique<RtcpTransceiver>(config);
queue.PostTask([&] { rtcp_transceiver.reset(); });
WaitPostedTasks(&queue);
}
TEST(RtcpTransceiverTest, CanCallSendCompoundPacketFromAnyThread) {
MockTransport outgoing_transport;
rtc::TaskQueue queue("rtcp");
RtcpTransceiverConfig config;
config.outgoing_transport = &outgoing_transport;
config.task_queue = &queue;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _))
// If test is slow, a periodic task may send an extra packet.
.Times(AtLeast(3))
.WillRepeatedly(InvokeWithoutArgs([&] {
EXPECT_TRUE(queue.IsCurrent());
return true;
}));
RtcpTransceiver rtcp_transceiver(config);
// Call from the construction thread.
rtcp_transceiver.SendCompoundPacket();
// Call from the same queue transceiver use for processing.
queue.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); });
// Call from unrelated task queue.
rtc::TaskQueue queue_send("send_packet");
queue_send.PostTask([&] { rtcp_transceiver.SendCompoundPacket(); });
WaitPostedTasks(&queue_send);
WaitPostedTasks(&queue);
}
TEST(RtcpTransceiverTest, DoesntSendPacketsAfterDestruction) {
MockTransport outgoing_transport;
rtc::TaskQueue queue("rtcp");
RtcpTransceiverConfig config;
config.outgoing_transport = &outgoing_transport;
config.task_queue = &queue;
config.schedule_periodic_compound_packets = false;
EXPECT_CALL(outgoing_transport, SendRtcp(_, _)).Times(0);
auto rtcp_transceiver = rtc::MakeUnique<RtcpTransceiver>(config);
rtc::Event pause(false, false);
queue.PostTask([&] {
pause.Wait(rtc::Event::kForever);
rtcp_transceiver.reset();
});
rtcp_transceiver->SendCompoundPacket();
pause.Set();
WaitPostedTasks(&queue);
EXPECT_FALSE(rtcp_transceiver);
}
} // namespace