Split FlexfecReceiveStreamImpl init into worker / network steps.
This is comparable to this change for AudioReceiveStream: https://webrtc-review.googlesource.com/c/src/+/220608/ Bug: webrtc:11993 Change-Id: I6bad7fa693441f80e86d8b021b8cf42727dc9142 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220609 Commit-Queue: Tommi <tommi@webrtc.org> Reviewed-by: Niels Moller <nisse@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34170}
This commit is contained in:
13
call/call.cc
13
call/call.cc
@ -1164,8 +1164,12 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
|
||||
// OnRtpPacket until the constructor is finished and the object is
|
||||
// in a valid state, since OnRtpPacket runs on the same thread.
|
||||
receive_stream = new FlexfecReceiveStreamImpl(
|
||||
clock_, &video_receiver_controller_, config, recovered_packet_receiver,
|
||||
call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
|
||||
clock_, config, recovered_packet_receiver, call_stats_->AsRtcpRttStats(),
|
||||
module_process_thread_->process_thread());
|
||||
|
||||
// TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
|
||||
// thread.
|
||||
receive_stream->RegisterWithTransport(&video_receiver_controller_);
|
||||
|
||||
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
|
||||
receive_rtp_config_.end());
|
||||
@ -1180,6 +1184,11 @@ void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
|
||||
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
|
||||
RTC_DCHECK_RUN_ON(worker_thread_);
|
||||
|
||||
FlexfecReceiveStreamImpl* receive_stream_impl =
|
||||
static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
|
||||
// TODO(bugs.webrtc.org/11993): Unregister on the network thread.
|
||||
receive_stream_impl->UnregisterFromTransport();
|
||||
|
||||
RTC_DCHECK(receive_stream != nullptr);
|
||||
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
|
||||
uint32_t ssrc = config.remote_ssrc;
|
||||
|
@ -138,7 +138,6 @@ std::unique_ptr<ModuleRtpRtcpImpl2> CreateRtpRtcpModule(
|
||||
|
||||
FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
|
||||
Clock* clock,
|
||||
RtpStreamReceiverControllerInterface* receiver_controller,
|
||||
const Config& config,
|
||||
RecoveredPacketReceiver* recovered_packet_receiver,
|
||||
RtcpRttStats* rtt_stats,
|
||||
@ -155,23 +154,11 @@ FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
|
||||
process_thread_(process_thread) {
|
||||
RTC_LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString();
|
||||
|
||||
network_thread_checker_.Detach();
|
||||
|
||||
// RTCP reporting.
|
||||
rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
|
||||
process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE);
|
||||
|
||||
// Register with transport.
|
||||
// TODO(nisse): OnRtpPacket in this class delegates all real work to
|
||||
// |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface
|
||||
// here at all, we'd then delete the OnRtpPacket method and instead register
|
||||
// |receiver_| as the RtpPacketSinkInterface for this stream.
|
||||
// TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before
|
||||
// the object is fully initialized, is risky. But it works in this case
|
||||
// because locking in our caller, Call::CreateFlexfecReceiveStream, ensures
|
||||
// that the demuxer doesn't call OnRtpPacket before this object is fully
|
||||
// constructed. Registering |receiver_| instead of |this| would solve this
|
||||
// problem too.
|
||||
rtp_stream_receiver_ =
|
||||
receiver_controller->CreateReceiver(config_.remote_ssrc, this);
|
||||
}
|
||||
|
||||
FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
|
||||
@ -179,6 +166,27 @@ FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
|
||||
process_thread_->DeRegisterModule(rtp_rtcp_.get());
|
||||
}
|
||||
|
||||
void FlexfecReceiveStreamImpl::RegisterWithTransport(
|
||||
RtpStreamReceiverControllerInterface* receiver_controller) {
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
RTC_DCHECK(!rtp_stream_receiver_);
|
||||
|
||||
if (!receiver_)
|
||||
return;
|
||||
|
||||
// TODO(nisse): OnRtpPacket in this class delegates all real work to
|
||||
// `receiver_`. So maybe we don't need to implement RtpPacketSinkInterface
|
||||
// here at all, we'd then delete the OnRtpPacket method and instead register
|
||||
// `receiver_` as the RtpPacketSinkInterface for this stream.
|
||||
rtp_stream_receiver_ =
|
||||
receiver_controller->CreateReceiver(config_.remote_ssrc, this);
|
||||
}
|
||||
|
||||
void FlexfecReceiveStreamImpl::UnregisterFromTransport() {
|
||||
RTC_DCHECK_RUN_ON(&network_thread_checker_);
|
||||
rtp_stream_receiver_.reset();
|
||||
}
|
||||
|
||||
void FlexfecReceiveStreamImpl::OnRtpPacket(const RtpPacketReceived& packet) {
|
||||
if (!receiver_)
|
||||
return;
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "call/flexfec_receive_stream.h"
|
||||
#include "call/rtp_packet_sink_interface.h"
|
||||
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
|
||||
#include "rtc_base/system/no_unique_address.h"
|
||||
#include "system_wrappers/include/clock.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -34,13 +35,26 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
|
||||
public:
|
||||
FlexfecReceiveStreamImpl(
|
||||
Clock* clock,
|
||||
RtpStreamReceiverControllerInterface* receiver_controller,
|
||||
const Config& config,
|
||||
RecoveredPacketReceiver* recovered_packet_receiver,
|
||||
RtcpRttStats* rtt_stats,
|
||||
ProcessThread* process_thread);
|
||||
// Destruction happens on the worker thread. Prior to destruction the caller
|
||||
// must ensure that a registration with the transport has been cleared. See
|
||||
// `RegisterWithTransport` for details.
|
||||
// TODO(tommi): As a further improvement to this, performing the full
|
||||
// destruction on the network thread could be made the default.
|
||||
~FlexfecReceiveStreamImpl() override;
|
||||
|
||||
// Called on the network thread to register/unregister with the network
|
||||
// transport.
|
||||
void RegisterWithTransport(
|
||||
RtpStreamReceiverControllerInterface* receiver_controller);
|
||||
// If registration has previously been done (via `RegisterWithTransport`) then
|
||||
// `UnregisterFromTransport` must be called prior to destruction, on the
|
||||
// network thread.
|
||||
void UnregisterFromTransport();
|
||||
|
||||
// RtpPacketSinkInterface.
|
||||
void OnRtpPacket(const RtpPacketReceived& packet) override;
|
||||
|
||||
@ -48,6 +62,8 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
|
||||
const Config& GetConfig() const override;
|
||||
|
||||
private:
|
||||
RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
|
||||
|
||||
// Config.
|
||||
const Config config_;
|
||||
|
||||
@ -57,9 +73,10 @@ class FlexfecReceiveStreamImpl : public FlexfecReceiveStream {
|
||||
// RTCP reporting.
|
||||
const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
|
||||
const std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
|
||||
ProcessThread* process_thread_;
|
||||
ProcessThread* const process_thread_;
|
||||
|
||||
std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
|
||||
std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_
|
||||
RTC_GUARDED_BY(network_thread_checker_);
|
||||
};
|
||||
|
||||
} // namespace webrtc
|
||||
|
@ -89,12 +89,14 @@ class FlexfecReceiveStreamTest : public ::testing::Test {
|
||||
: config_(CreateDefaultConfig(&rtcp_send_transport_)) {
|
||||
EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
|
||||
receive_stream_ = std::make_unique<FlexfecReceiveStreamImpl>(
|
||||
Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
|
||||
&recovered_packet_receiver_, &rtt_stats_, &process_thread_);
|
||||
Clock::GetRealTimeClock(), config_, &recovered_packet_receiver_,
|
||||
&rtt_stats_, &process_thread_);
|
||||
receive_stream_->RegisterWithTransport(&rtp_stream_receiver_controller_);
|
||||
}
|
||||
|
||||
~FlexfecReceiveStreamTest() {
|
||||
EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
|
||||
receive_stream_->UnregisterFromTransport();
|
||||
}
|
||||
|
||||
MockTransport rtcp_send_transport_;
|
||||
@ -145,9 +147,10 @@ TEST_F(FlexfecReceiveStreamTest, RecoversPacket) {
|
||||
|
||||
::testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
|
||||
EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
|
||||
FlexfecReceiveStreamImpl receive_stream(
|
||||
Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
|
||||
&recovered_packet_receiver, &rtt_stats_, &process_thread_);
|
||||
FlexfecReceiveStreamImpl receive_stream(Clock::GetRealTimeClock(), config_,
|
||||
&recovered_packet_receiver,
|
||||
&rtt_stats_, &process_thread_);
|
||||
receive_stream.RegisterWithTransport(&rtp_stream_receiver_controller_);
|
||||
|
||||
EXPECT_CALL(recovered_packet_receiver,
|
||||
OnRecoveredPacket(_, kRtpHeaderSize + kPayloadLength[1]));
|
||||
@ -156,6 +159,8 @@ TEST_F(FlexfecReceiveStreamTest, RecoversPacket) {
|
||||
|
||||
// Tear-down
|
||||
EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
|
||||
|
||||
receive_stream.UnregisterFromTransport();
|
||||
}
|
||||
|
||||
} // namespace webrtc
|
||||
|
Reference in New Issue
Block a user