Update libjingle to 50191337.

R=mallinath@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/1885005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4461 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
wu@webrtc.org
2013-08-01 00:00:07 +00:00
parent d3ae3c7b1f
commit d64719d895
31 changed files with 479 additions and 278 deletions

View File

@ -34,7 +34,8 @@
namespace webrtc {
static size_t kMaxQueuedDataPackets = 100;
static size_t kMaxQueuedReceivedDataPackets = 100;
static size_t kMaxQueuedSendDataPackets = 100;
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
WebRtcSession* session,
@ -95,12 +96,13 @@ bool DataChannel::HasNegotiationCompleted() {
}
DataChannel::~DataChannel() {
ClearQueuedData();
ClearQueuedReceivedData();
ClearQueuedSendData();
}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
observer_ = observer;
DeliverQueuedData();
DeliverQueuedReceivedData();
}
void DataChannel::UnregisterObserver() {
@ -117,7 +119,13 @@ bool DataChannel::reliable() const {
}
uint64 DataChannel::buffered_amount() const {
return 0;
uint64 buffered_amount = 0;
for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
it != queued_send_data_.end();
++it) {
buffered_amount += (*it)->size();
}
return buffered_amount;
}
void DataChannel::Close() {
@ -133,20 +141,22 @@ bool DataChannel::Send(const DataBuffer& buffer) {
if (state_ != kOpen) {
return false;
}
cricket::SendDataParams send_params;
send_params.ssrc = send_ssrc_;
if (session_->data_channel_type() == cricket::DCT_SCTP) {
send_params.ordered = config_.ordered;
send_params.max_rtx_count = config_.maxRetransmits;
send_params.max_rtx_ms = config_.maxRetransmitTime;
// If the queue is non-empty, we're waiting for SignalReadyToSend,
// so just add to the end of the queue and keep waiting.
if (!queued_send_data_.empty()) {
return QueueSendData(buffer);
}
send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
cricket::SendDataResult send_result;
// TODO(pthatcher): Use send_result.would_block for buffering.
return session_->data_channel()->SendData(
send_params, buffer.data, &send_result);
if (!InternalSendWithoutQueueing(buffer, &send_result)) {
if (send_result == cricket::SDR_BLOCK) {
return QueueSendData(buffer);
}
// Fail for other results.
// TODO(jiayl): We should close the data channel in this case.
return false;
}
return true;
}
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@ -183,6 +193,43 @@ void DataChannel::OnDataEngineClose() {
DoClose();
}
void DataChannel::OnDataReceived(cricket::DataChannel* channel,
const cricket::ReceiveDataParams& params,
const talk_base::Buffer& payload) {
if (params.ssrc != receive_ssrc_) {
return;
}
bool binary = (params.type == cricket::DMT_BINARY);
talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
if (was_ever_writable_ && observer_) {
observer_->OnMessage(*buffer.get());
} else {
if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
// TODO(jiayl): We should close the data channel in this case.
LOG(LS_ERROR)
<< "Queued received data exceeds the max number of packes.";
ClearQueuedReceivedData();
}
queued_received_data_.push(buffer.release());
}
}
void DataChannel::OnChannelReady(bool writable) {
if (!writable) {
return;
}
// Update the readyState if the channel is writable for the first time;
// otherwise it means the channel was blocked for sending and now unblocked,
// so send the queued data now.
if (!was_ever_writable_) {
was_ever_writable_ = true;
UpdateState();
} else if (state_ == kOpen) {
SendQueuedSendData();
}
}
void DataChannel::DoClose() {
receive_ssrc_set_ = false;
send_ssrc_set_ = false;
@ -201,7 +248,7 @@ void DataChannel::UpdateState() {
SetState(kOpen);
// If we have received buffers before the channel got writable.
// Deliver them now.
DeliverQueuedData();
DeliverQueuedReceivedData();
}
}
break;
@ -249,47 +296,76 @@ void DataChannel::DisconnectFromDataSession() {
data_session_ = NULL;
}
void DataChannel::DeliverQueuedData() {
if (was_ever_writable_ && observer_) {
while (!queued_data_.empty()) {
DataBuffer* buffer = queued_data_.front();
observer_->OnMessage(*buffer);
queued_data_.pop();
delete buffer;
}
void DataChannel::DeliverQueuedReceivedData() {
if (!was_ever_writable_ || !observer_) {
return;
}
}
void DataChannel::ClearQueuedData() {
while (!queued_data_.empty()) {
DataBuffer* buffer = queued_data_.front();
queued_data_.pop();
while (!queued_received_data_.empty()) {
DataBuffer* buffer = queued_received_data_.front();
observer_->OnMessage(*buffer);
queued_received_data_.pop();
delete buffer;
}
}
void DataChannel::OnDataReceived(cricket::DataChannel* channel,
const cricket::ReceiveDataParams& params,
const talk_base::Buffer& payload) {
if (params.ssrc == receive_ssrc_) {
bool binary = false;
talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
if (was_ever_writable_ && observer_) {
observer_->OnMessage(*buffer.get());
} else {
if (queued_data_.size() > kMaxQueuedDataPackets) {
ClearQueuedData();
}
queued_data_.push(buffer.release());
}
void DataChannel::ClearQueuedReceivedData() {
while (!queued_received_data_.empty()) {
DataBuffer* buffer = queued_received_data_.front();
queued_received_data_.pop();
delete buffer;
}
}
void DataChannel::OnChannelReady(bool writable) {
if (!was_ever_writable_ && writable) {
was_ever_writable_ = true;
UpdateState();
void DataChannel::SendQueuedSendData() {
if (!was_ever_writable_) {
return;
}
while (!queued_send_data_.empty()) {
DataBuffer* buffer = queued_send_data_.front();
cricket::SendDataResult send_result;
if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
<< send_result;
break;
}
queued_send_data_.pop_front();
delete buffer;
}
}
void DataChannel::ClearQueuedSendData() {
while (!queued_received_data_.empty()) {
DataBuffer* buffer = queued_received_data_.front();
queued_received_data_.pop();
delete buffer;
}
}
bool DataChannel::InternalSendWithoutQueueing(
const DataBuffer& buffer, cricket::SendDataResult* send_result) {
cricket::SendDataParams send_params;
send_params.ssrc = send_ssrc_;
if (session_->data_channel_type() == cricket::DCT_SCTP) {
send_params.ordered = config_.ordered;
send_params.max_rtx_count = config_.maxRetransmits;
send_params.max_rtx_ms = config_.maxRetransmitTime;
}
send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
return session_->data_channel()->SendData(send_params, buffer.data,
send_result);
}
bool DataChannel::QueueSendData(const DataBuffer& buffer) {
if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
return false;
}
queued_send_data_.push_back(new DataBuffer(buffer));
return true;
}
} // namespace webrtc