Make RtcpTransceiver destructor non-blocking
At cost of removing assumption callbacks can't be used after destructor. Bug: webrtc:8239 Change-Id: Id79f7553528cf6c102d3ee0bf7aa2de5b0437d2a Reviewed-on: https://webrtc-review.googlesource.com/98860 Reviewed-by: Per Kjellander <perkj@webrtc.org> Commit-Queue: Danil Chapovalov <danilchap@webrtc.org> Cr-Commit-Position: refs/heads/master@{#24632}
This commit is contained in:

committed by
Commit Bot

parent
d7027dc081
commit
792df6b4b9
@ -19,6 +19,12 @@
|
|||||||
#include "rtc_base/timeutils.h"
|
#include "rtc_base/timeutils.h"
|
||||||
|
|
||||||
namespace webrtc {
|
namespace webrtc {
|
||||||
|
namespace {
|
||||||
|
struct Destructor {
|
||||||
|
void operator()() { rtcp_transceiver = nullptr; }
|
||||||
|
std::unique_ptr<RtcpTransceiverImpl> rtcp_transceiver;
|
||||||
|
};
|
||||||
|
} // namespace
|
||||||
|
|
||||||
RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config)
|
RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config)
|
||||||
: task_queue_(config.task_queue),
|
: task_queue_(config.task_queue),
|
||||||
@ -29,29 +35,12 @@ RtcpTransceiver::RtcpTransceiver(const RtcpTransceiverConfig& config)
|
|||||||
RtcpTransceiver::~RtcpTransceiver() {
|
RtcpTransceiver::~RtcpTransceiver() {
|
||||||
if (!rtcp_transceiver_)
|
if (!rtcp_transceiver_)
|
||||||
return;
|
return;
|
||||||
RTC_CHECK(!task_queue_->IsCurrent());
|
task_queue_->PostTask(Destructor{std::move(rtcp_transceiver_)});
|
||||||
|
RTC_DCHECK(!rtcp_transceiver_);
|
||||||
rtc::Event done(false, false);
|
|
||||||
// TODO(danilchap): Merge cleanup into main closure when task queue does not
|
|
||||||
// silently drop tasks.
|
|
||||||
task_queue_->PostTask(rtc::NewClosure(
|
|
||||||
[this] {
|
|
||||||
// Destructor steps that better run on the task_queue_.
|
|
||||||
rtcp_transceiver_.reset();
|
|
||||||
},
|
|
||||||
/*cleanup=*/[&done] { done.Set(); }));
|
|
||||||
// Wait until destruction is complete to guarantee callbacks are not used
|
|
||||||
// after destructor returns.
|
|
||||||
done.Wait(rtc::Event::kForever);
|
|
||||||
RTC_CHECK(!rtcp_transceiver_) << "Task queue is too busy to handle rtcp";
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RtcpTransceiver::Stop(std::unique_ptr<rtc::QueuedTask> on_destroyed) {
|
void RtcpTransceiver::Stop(std::unique_ptr<rtc::QueuedTask> on_destroyed) {
|
||||||
RTC_DCHECK(rtcp_transceiver_);
|
RTC_DCHECK(rtcp_transceiver_);
|
||||||
struct Destructor {
|
|
||||||
void operator()() { rtcp_transceiver = nullptr; }
|
|
||||||
std::unique_ptr<RtcpTransceiverImpl> rtcp_transceiver;
|
|
||||||
};
|
|
||||||
task_queue_->PostTaskAndReply(Destructor{std::move(rtcp_transceiver_)},
|
task_queue_->PostTaskAndReply(Destructor{std::move(rtcp_transceiver_)},
|
||||||
std::move(on_destroyed));
|
std::move(on_destroyed));
|
||||||
RTC_DCHECK(!rtcp_transceiver_);
|
RTC_DCHECK(!rtcp_transceiver_);
|
||||||
|
@ -29,16 +29,18 @@ namespace webrtc {
|
|||||||
class RtcpTransceiver : public RtcpFeedbackSenderInterface {
|
class RtcpTransceiver : public RtcpFeedbackSenderInterface {
|
||||||
public:
|
public:
|
||||||
explicit RtcpTransceiver(const RtcpTransceiverConfig& config);
|
explicit RtcpTransceiver(const RtcpTransceiverConfig& config);
|
||||||
// Blocks unless Stop was called.
|
// Note that interfaces provided in constructor still might be used after the
|
||||||
// TODO(danilchap): Change destructor to never block by breaking assumption
|
// destructor. However they can only be used on the confic.task_queue.
|
||||||
// callbacks are not used after destruction.
|
// Use Stop function to get notified when they are no longer used or
|
||||||
|
// ensure those objects outlive the task queue.
|
||||||
~RtcpTransceiver() override;
|
~RtcpTransceiver() override;
|
||||||
|
|
||||||
// Start asynchronious destruction of the RtcpTransceiver.
|
// Start asynchronious destruction of the RtcpTransceiver.
|
||||||
// It is safe to call destructor right after Stop exits.
|
// It is safe to call destructor right after Stop exits.
|
||||||
// No other methods can be called.
|
// No other methods can be called.
|
||||||
// Note that observers provided in constructor or registered with AddObserver
|
// Note that interfaces provided in constructor or registered with AddObserver
|
||||||
// still might be used by the transceiver until |on_destroyed| runs.
|
// still might be used by the transceiver on the task queue
|
||||||
|
// until |on_destroyed| runs.
|
||||||
void Stop(std::unique_ptr<rtc::QueuedTask> on_destroyed);
|
void Stop(std::unique_ptr<rtc::QueuedTask> on_destroyed);
|
||||||
|
|
||||||
// Registers observer to be notified about incoming rtcp packets.
|
// Registers observer to be notified about incoming rtcp packets.
|
||||||
|
@ -47,8 +47,8 @@ void WaitPostedTasks(rtc::TaskQueue* queue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
|
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
|
||||||
rtc::TaskQueue queue("rtcp");
|
|
||||||
MockTransport outgoing_transport;
|
MockTransport outgoing_transport;
|
||||||
|
rtc::TaskQueue queue("rtcp");
|
||||||
RtcpTransceiverConfig config;
|
RtcpTransceiverConfig config;
|
||||||
config.outgoing_transport = &outgoing_transport;
|
config.outgoing_transport = &outgoing_transport;
|
||||||
config.task_queue = &queue;
|
config.task_queue = &queue;
|
||||||
@ -64,8 +64,8 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOffTaskQueue) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
|
TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
|
||||||
rtc::TaskQueue queue("rtcp");
|
|
||||||
MockTransport outgoing_transport;
|
MockTransport outgoing_transport;
|
||||||
|
rtc::TaskQueue queue("rtcp");
|
||||||
RtcpTransceiverConfig config;
|
RtcpTransceiverConfig config;
|
||||||
config.outgoing_transport = &outgoing_transport;
|
config.outgoing_transport = &outgoing_transport;
|
||||||
config.task_queue = &queue;
|
config.task_queue = &queue;
|
||||||
@ -83,39 +83,62 @@ TEST(RtcpTransceiverTest, SendsRtcpOnTaskQueueWhenCreatedOnTaskQueue) {
|
|||||||
WaitPostedTasks(&queue);
|
WaitPostedTasks(&queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueueAfterStop) {
|
TEST(RtcpTransceiverTest, CanBeDestroyedOnTaskQueue) {
|
||||||
rtc::TaskQueue queue("rtcp");
|
|
||||||
NiceMock<MockTransport> outgoing_transport;
|
NiceMock<MockTransport> outgoing_transport;
|
||||||
|
rtc::TaskQueue queue("rtcp");
|
||||||
RtcpTransceiverConfig config;
|
RtcpTransceiverConfig config;
|
||||||
config.outgoing_transport = &outgoing_transport;
|
config.outgoing_transport = &outgoing_transport;
|
||||||
config.task_queue = &queue;
|
config.task_queue = &queue;
|
||||||
auto rtcp_transceiver = absl::make_unique<RtcpTransceiver>(config);
|
auto rtcp_transceiver = absl::make_unique<RtcpTransceiver>(config);
|
||||||
rtcp_transceiver->Stop(rtc::NewClosure([] {}));
|
|
||||||
|
|
||||||
queue.PostTask([&] { rtcp_transceiver.reset(); });
|
queue.PostTask([&] {
|
||||||
|
// Insert a packet just before destruction to test for races.
|
||||||
|
rtcp_transceiver->SendCompoundPacket();
|
||||||
|
rtcp_transceiver.reset();
|
||||||
|
});
|
||||||
WaitPostedTasks(&queue);
|
WaitPostedTasks(&queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlockingAfterStop) {
|
TEST(RtcpTransceiverTest, CanBeDestroyedWithoutBlocking) {
|
||||||
rtc::TaskQueue queue("rtcp");
|
rtc::TaskQueue queue("rtcp");
|
||||||
NiceMock<MockTransport> outgoing_transport;
|
NiceMock<MockTransport> outgoing_transport;
|
||||||
RtcpTransceiverConfig config;
|
RtcpTransceiverConfig config;
|
||||||
config.outgoing_transport = &outgoing_transport;
|
config.outgoing_transport = &outgoing_transport;
|
||||||
config.task_queue = &queue;
|
config.task_queue = &queue;
|
||||||
auto rtcp_transceiver = absl::make_unique<RtcpTransceiver>(config);
|
auto* rtcp_transceiver = new RtcpTransceiver(config);
|
||||||
rtcp_transceiver->SendCompoundPacket();
|
rtcp_transceiver->SendCompoundPacket();
|
||||||
|
|
||||||
rtc::Event heavy_task(false, false);
|
|
||||||
queue.PostTask(
|
|
||||||
rtc::NewClosure([&] { EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); }));
|
|
||||||
rtc::Event done(false, false);
|
rtc::Event done(false, false);
|
||||||
rtcp_transceiver->Stop(rtc::NewClosure([&done] { done.Set(); }));
|
rtc::Event heavy_task(false, false);
|
||||||
rtcp_transceiver = nullptr;
|
queue.PostTask([&] {
|
||||||
|
EXPECT_TRUE(heavy_task.Wait(kTimeoutMs));
|
||||||
|
done.Set();
|
||||||
|
});
|
||||||
|
delete rtcp_transceiver;
|
||||||
|
|
||||||
heavy_task.Set();
|
heavy_task.Set();
|
||||||
EXPECT_TRUE(done.Wait(kTimeoutMs));
|
EXPECT_TRUE(done.Wait(kTimeoutMs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(RtcpTransceiverTest, MaySendPacketsAfterDestructor) { // i.e. Be careful!
|
||||||
|
NiceMock<MockTransport> outgoing_transport; // Must outlive queue below.
|
||||||
|
rtc::TaskQueue queue("rtcp");
|
||||||
|
RtcpTransceiverConfig config;
|
||||||
|
config.outgoing_transport = &outgoing_transport;
|
||||||
|
config.task_queue = &queue;
|
||||||
|
auto* rtcp_transceiver = new RtcpTransceiver(config);
|
||||||
|
|
||||||
|
rtc::Event heavy_task(false, false);
|
||||||
|
queue.PostTask([&] { EXPECT_TRUE(heavy_task.Wait(kTimeoutMs)); });
|
||||||
|
rtcp_transceiver->SendCompoundPacket();
|
||||||
|
delete rtcp_transceiver;
|
||||||
|
|
||||||
|
EXPECT_CALL(outgoing_transport, SendRtcp);
|
||||||
|
heavy_task.Set();
|
||||||
|
|
||||||
|
WaitPostedTasks(&queue);
|
||||||
|
}
|
||||||
|
|
||||||
// Use rtp timestamp to distinguish different incoming sender reports.
|
// Use rtp timestamp to distinguish different incoming sender reports.
|
||||||
rtc::CopyOnWriteBuffer CreateSenderReport(uint32_t ssrc, uint32_t rtp_time) {
|
rtc::CopyOnWriteBuffer CreateSenderReport(uint32_t ssrc, uint32_t rtp_time) {
|
||||||
webrtc::rtcp::SenderReport sr;
|
webrtc::rtcp::SenderReport sr;
|
||||||
|
Reference in New Issue
Block a user