Refactoring DataChannelController from PeerConnection part 4

This CL:
- Moved HasDataChannel and data_channel_type_
- Moved rtp_data_channels_
- Moved sctp_data_channels_
- Moved data_channel_controller to its own .h file
- Various changes to reduce the coupling between the classes
- Removed friendship between DataChannelController and PeerConnection

Bug: webrtc:11146
Change-Id: Ib8c395e4c90ce34baf40812d1dade0ffa79f2438
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/161094
Commit-Queue: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29987}
This commit is contained in:
Harald Alvestrand
2019-12-03 14:04:21 +01:00
committed by Commit Bot
parent bcce4530d4
commit 05e4d08e35
5 changed files with 449 additions and 372 deletions

View File

@ -8,22 +8,23 @@
* be found in the AUTHORS file in the root of the source tree.
*/
// This file contains the implementation of the class
// webrtc::PeerConnection::DataChannelController.
//
// The intent is that this should be webrtc::DataChannelController, but
// as a migration stage, it is simpler to have it as an inner class,
// declared in the header file pc/peer_connection.h
#include "pc/data_channel_controller.h"
#include <utility>
#include "pc/peer_connection.h"
#include "pc/sctp_utils.h"
namespace webrtc {
bool PeerConnection::DataChannelController::SendData(
const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
bool DataChannelController::HasDataChannels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return !rtp_data_channels_.empty() || !sctp_data_channels_.empty();
}
bool DataChannelController::SendData(const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
// RTC_DCHECK_RUN_ON(signaling_thread());
if (data_channel_transport()) {
SendDataParams send_params;
@ -59,7 +60,7 @@ bool PeerConnection::DataChannelController::SendData(
return false;
}
bool PeerConnection::DataChannelController::ConnectDataChannel(
bool DataChannelController::ConnectDataChannel(
DataChannel* webrtc_data_channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!rtp_data_channel() && !data_channel_transport()) {
@ -87,7 +88,7 @@ bool PeerConnection::DataChannelController::ConnectDataChannel(
return true;
}
void PeerConnection::DataChannelController::DisconnectDataChannel(
void DataChannelController::DisconnectDataChannel(
DataChannel* webrtc_data_channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
if (!rtp_data_channel() && !data_channel_transport()) {
@ -108,7 +109,7 @@ void PeerConnection::DataChannelController::DisconnectDataChannel(
}
}
void PeerConnection::DataChannelController::AddSctpDataStream(int sid) {
void DataChannelController::AddSctpDataStream(int sid) {
if (data_channel_transport()) {
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
if (data_channel_transport()) {
@ -118,7 +119,7 @@ void PeerConnection::DataChannelController::AddSctpDataStream(int sid) {
}
}
void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) {
void DataChannelController::RemoveSctpDataStream(int sid) {
if (data_channel_transport()) {
network_thread()->Invoke<void>(RTC_FROM_HERE, [this, sid] {
if (data_channel_transport()) {
@ -128,13 +129,13 @@ void PeerConnection::DataChannelController::RemoveSctpDataStream(int sid) {
}
}
bool PeerConnection::DataChannelController::ReadyToSendData() const {
bool DataChannelController::ReadyToSendData() const {
RTC_DCHECK_RUN_ON(signaling_thread());
return (rtp_data_channel() && rtp_data_channel()->ready_to_send_data()) ||
(data_channel_transport() && data_channel_transport_ready_to_send_);
}
void PeerConnection::DataChannelController::OnDataReceived(
void DataChannelController::OnDataReceived(
int channel_id,
DataMessageType type,
const rtc::CopyOnWriteBuffer& buffer) {
@ -151,7 +152,7 @@ void PeerConnection::DataChannelController::OnDataReceived(
});
}
void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) {
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@ -160,7 +161,7 @@ void PeerConnection::DataChannelController::OnChannelClosing(int channel_id) {
});
}
void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) {
void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, channel_id] {
@ -169,7 +170,7 @@ void PeerConnection::DataChannelController::OnChannelClosed(int channel_id) {
});
}
void PeerConnection::DataChannelController::OnReadyToSend() {
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
@ -180,12 +181,12 @@ void PeerConnection::DataChannelController::OnReadyToSend() {
});
}
void PeerConnection::DataChannelController::SetupDataChannelTransport_n() {
void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_ = std::make_unique<rtc::AsyncInvoker>();
}
void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() {
void DataChannelController::TeardownDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
data_channel_transport_invoker_ = nullptr;
if (data_channel_transport()) {
@ -194,7 +195,7 @@ void PeerConnection::DataChannelController::TeardownDataChannelTransport_n() {
set_data_channel_transport(nullptr);
}
void PeerConnection::DataChannelController::OnTransportChanged(
void DataChannelController::OnTransportChanged(
DataChannelTransportInterface* new_data_channel_transport) {
RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport() &&
@ -211,8 +212,8 @@ void PeerConnection::DataChannelController::OnTransportChanged(
// necessary when bundling is applied.
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this] {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
for (auto channel : pc_->sctp_data_channels_) {
RTC_DCHECK_RUN_ON(signaling_thread());
for (auto channel : sctp_data_channels_) {
channel->OnTransportChannelCreated();
}
});
@ -220,7 +221,7 @@ void PeerConnection::DataChannelController::OnTransportChanged(
}
}
bool PeerConnection::DataChannelController::HandleOpenMessage_s(
bool DataChannelController::HandleOpenMessage_s(
const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& buffer) {
if (params.type == cricket::DMT_CONTROL && IsOpenMessage(buffer)) {
@ -241,7 +242,7 @@ bool PeerConnection::DataChannelController::HandleOpenMessage_s(
return false;
}
void PeerConnection::DataChannelController::OnDataChannelOpenMessage(
void DataChannelController::OnDataChannelOpenMessage(
const std::string& label,
const InternalDataChannelInit& config) {
rtc::scoped_refptr<DataChannel> channel(
@ -253,29 +254,26 @@ void PeerConnection::DataChannelController::OnDataChannelOpenMessage(
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
{
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
pc_->NoteUsageEvent(UsageEvent::DATA_ADDED);
}
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
pc_->NoteDataAddedEvent();
}
rtc::scoped_refptr<DataChannel>
PeerConnection::DataChannelController::InternalCreateDataChannel(
DataChannelController::InternalCreateDataChannel(
const std::string& label,
const InternalDataChannelInit* config) {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
RTC_DCHECK_RUN_ON(signaling_thread());
if (pc_->IsClosed()) {
return nullptr;
}
if (pc_->data_channel_type() == cricket::DCT_NONE) {
if (data_channel_type_ == cricket::DCT_NONE) {
RTC_LOG(LS_ERROR)
<< "InternalCreateDataChannel: Data is not supported in this call.";
return nullptr;
}
InternalDataChannelInit new_config =
config ? (*config) : InternalDataChannelInit();
if (DataChannel::IsSctpLike(pc_->data_channel_type_)) {
if (DataChannel::IsSctpLike(data_channel_type_)) {
if (new_config.id < 0) {
rtc::SSLRole role;
if ((pc_->GetSctpSslRole(&role)) &&
@ -292,36 +290,33 @@ PeerConnection::DataChannelController::InternalCreateDataChannel(
}
rtc::scoped_refptr<DataChannel> channel(
DataChannel::Create(this, pc_->data_channel_type(), label, new_config));
DataChannel::Create(this, data_channel_type(), label, new_config));
if (!channel) {
sid_allocator_.ReleaseSid(new_config.id);
return nullptr;
}
if (channel->data_channel_type() == cricket::DCT_RTP) {
if (pc_->rtp_data_channels_.find(channel->label()) !=
pc_->rtp_data_channels_.end()) {
if (rtp_data_channels_.find(channel->label()) != rtp_data_channels_.end()) {
RTC_LOG(LS_ERROR) << "DataChannel with label " << channel->label()
<< " already exists.";
return nullptr;
}
pc_->rtp_data_channels_[channel->label()] = channel;
rtp_data_channels_[channel->label()] = channel;
} else {
RTC_DCHECK(DataChannel::IsSctpLike(pc_->data_channel_type_));
pc_->sctp_data_channels_.push_back(channel);
RTC_DCHECK(DataChannel::IsSctpLike(data_channel_type_));
sctp_data_channels_.push_back(channel);
channel->SignalClosed.connect(pc_,
&PeerConnection::OnSctpDataChannelClosed);
}
pc_->SignalDataChannelCreated_(channel.get());
SignalDataChannelCreated_(channel.get());
return channel;
}
void PeerConnection::DataChannelController::AllocateSctpSids(
rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<rtc::scoped_refptr<DataChannel>> channels_to_close;
for (const auto& channel : pc_->sctp_data_channels_) {
for (const auto& channel : sctp_data_channels_) {
if (channel->id() < 0) {
int sid;
if (!sid_allocator_.AllocateSid(role, &sid)) {
@ -339,11 +334,10 @@ void PeerConnection::DataChannelController::AllocateSctpSids(
}
}
void PeerConnection::DataChannelController::OnSctpDataChannelClosed(
DataChannel* channel) {
RTC_DCHECK_RUN_ON(pc_->signaling_thread());
for (auto it = pc_->sctp_data_channels_.begin();
it != pc_->sctp_data_channels_.end(); ++it) {
void DataChannelController::OnSctpDataChannelClosed(DataChannel* channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
++it) {
if (it->get() == channel) {
if (channel->id() >= 0) {
// After the closing procedure is done, it's safe to use this ID for
@ -352,12 +346,138 @@ void PeerConnection::DataChannelController::OnSctpDataChannelClosed(
}
// Since this method is triggered by a signal from the DataChannel,
// we can't free it directly here; we need to free it asynchronously.
pc_->sctp_data_channels_to_free_.push_back(*it);
pc_->sctp_data_channels_.erase(it);
sctp_data_channels_to_free_.push_back(*it);
sctp_data_channels_.erase(it);
pc_->SignalFreeDataChannels();
return;
}
}
}
void DataChannelController::OnTransportChannelClosed() {
RTC_DCHECK_RUN_ON(signaling_thread());
// Use a temporary copy of the RTP/SCTP DataChannel list because the
// DataChannel may callback to us and try to modify the list.
std::map<std::string, rtc::scoped_refptr<DataChannel>> temp_rtp_dcs;
temp_rtp_dcs.swap(rtp_data_channels_);
for (const auto& kv : temp_rtp_dcs) {
kv.second->OnTransportChannelClosed();
}
std::vector<rtc::scoped_refptr<DataChannel>> temp_sctp_dcs;
temp_sctp_dcs.swap(sctp_data_channels_);
for (const auto& channel : temp_sctp_dcs) {
channel->OnTransportChannelClosed();
}
}
DataChannel* DataChannelController::FindDataChannelBySid(int sid) const {
RTC_DCHECK_RUN_ON(signaling_thread());
for (const auto& channel : sctp_data_channels_) {
if (channel->id() == sid) {
return channel;
}
}
return nullptr;
}
void DataChannelController::UpdateLocalRtpDataChannels(
const cricket::StreamParamsVec& streams) {
std::vector<std::string> existing_channels;
RTC_DCHECK_RUN_ON(signaling_thread());
// Find new and active data channels.
for (const cricket::StreamParams& params : streams) {
// |it->sync_label| is actually the data channel label. The reason is that
// we use the same naming of data channels as we do for
// MediaStreams and Tracks.
// For MediaStreams, the sync_label is the MediaStream label and the
// track label is the same as |streamid|.
const std::string& channel_label = params.first_stream_id();
auto data_channel_it = rtp_data_channels()->find(channel_label);
if (data_channel_it == rtp_data_channels()->end()) {
RTC_LOG(LS_ERROR) << "channel label not found";
continue;
}
// Set the SSRC the data channel should use for sending.
data_channel_it->second->SetSendSsrc(params.first_ssrc());
existing_channels.push_back(data_channel_it->first);
}
UpdateClosingRtpDataChannels(existing_channels, true);
}
void DataChannelController::UpdateRemoteRtpDataChannels(
const cricket::StreamParamsVec& streams) {
std::vector<std::string> existing_channels;
RTC_DCHECK_RUN_ON(signaling_thread());
// Find new and active data channels.
for (const cricket::StreamParams& params : streams) {
// The data channel label is either the mslabel or the SSRC if the mslabel
// does not exist. Ex a=ssrc:444330170 mslabel:test1.
std::string label = params.first_stream_id().empty()
? rtc::ToString(params.first_ssrc())
: params.first_stream_id();
auto data_channel_it = rtp_data_channels()->find(label);
if (data_channel_it == rtp_data_channels()->end()) {
// This is a new data channel.
CreateRemoteRtpDataChannel(label, params.first_ssrc());
} else {
data_channel_it->second->SetReceiveSsrc(params.first_ssrc());
}
existing_channels.push_back(label);
}
UpdateClosingRtpDataChannels(existing_channels, false);
}
void DataChannelController::UpdateClosingRtpDataChannels(
const std::vector<std::string>& active_channels,
bool is_local_update) {
auto it = rtp_data_channels_.begin();
while (it != rtp_data_channels_.end()) {
DataChannel* data_channel = it->second;
if (absl::c_linear_search(active_channels, data_channel->label())) {
++it;
continue;
}
if (is_local_update) {
data_channel->SetSendSsrc(0);
} else {
data_channel->RemotePeerRequestClose();
}
if (data_channel->state() == DataChannel::kClosed) {
rtp_data_channels_.erase(it);
it = rtp_data_channels_.begin();
} else {
++it;
}
}
}
void DataChannelController::CreateRemoteRtpDataChannel(const std::string& label,
uint32_t remote_ssrc) {
rtc::scoped_refptr<DataChannel> channel(
InternalCreateDataChannel(label, nullptr));
if (!channel.get()) {
RTC_LOG(LS_WARNING) << "Remote peer requested a DataChannel but"
"CreateDataChannel failed.";
return;
}
channel->SetReceiveSsrc(remote_ssrc);
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
}
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}
rtc::Thread* DataChannelController::signaling_thread() const {
return pc_->signaling_thread();
}
} // namespace webrtc