
As documented in webrtc:11908 this cleanup is fairly invasive and when a part of a frequently executed code path, can be quite costly in terms of performance overhead. This is currently the case with synchronous calls between threads (Thread) as well with our proxy api classes. With this CL, all code in WebRTC should now either be using MessageHandlerAutoCleanup or calling MessageHandler(false) explicitly. Next steps will be to update external code to either depend on the AutoCleanup variant, or call MessageHandler(false). Changing the proxy classes to use TaskQueue set of concepts instead of MessageHandler. This avoids the perf overhead related to the cleanup above as well as incompatibility with the thread policy checks in Thread that some current external users of the proxies would otherwise run into (if we were to use Thread::Send() for synchronous call). Following this we'll move the cleanup step into the AutoCleanup class and an RTC_DCHECK that all calls to the MessageHandler are setting the flag to false, before eventually removing the flag and make MessageHandler pure virtual. Bug: webrtc:11908 Change-Id: Idf4ff9bcc8438cb8c583777e282005e0bc511c8f Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/183442 Reviewed-by: Artem Titov <titovartem@webrtc.org> Commit-Queue: Tommi <tommi@webrtc.org> Cr-Commit-Position: refs/heads/master@{#32049}
513 lines
15 KiB
C++
513 lines
15 KiB
C++
/*
|
|
* Copyright 2012 The WebRTC Project Authors. All rights reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
|
|
#include "examples/peerconnection/client/peer_connection_client.h"
|
|
|
|
#include "examples/peerconnection/client/defaults.h"
|
|
#include "rtc_base/checks.h"
|
|
#include "rtc_base/logging.h"
|
|
#include "rtc_base/net_helpers.h"
|
|
|
|
#ifdef WIN32
|
|
#include "rtc_base/win32_socket_server.h"
|
|
#endif
|
|
|
|
namespace {
|
|
|
|
// This is our magical hangup signal.
|
|
const char kByeMessage[] = "BYE";
|
|
// Delay between server connection retries, in milliseconds
|
|
const int kReconnectDelay = 2000;
|
|
|
|
rtc::AsyncSocket* CreateClientSocket(int family) {
|
|
#ifdef WIN32
|
|
rtc::Win32Socket* sock = new rtc::Win32Socket();
|
|
sock->CreateT(family, SOCK_STREAM);
|
|
return sock;
|
|
#elif defined(WEBRTC_POSIX)
|
|
rtc::Thread* thread = rtc::Thread::Current();
|
|
RTC_DCHECK(thread != NULL);
|
|
return thread->socketserver()->CreateAsyncSocket(family, SOCK_STREAM);
|
|
#else
|
|
#error Platform not supported.
|
|
#endif
|
|
}
|
|
|
|
} // namespace
|
|
|
|
PeerConnectionClient::PeerConnectionClient()
|
|
: MessageHandler(false),
|
|
callback_(NULL),
|
|
resolver_(NULL),
|
|
state_(NOT_CONNECTED),
|
|
my_id_(-1) {}
|
|
|
|
PeerConnectionClient::~PeerConnectionClient() {
|
|
rtc::Thread::Current()->Clear(this);
|
|
}
|
|
|
|
void PeerConnectionClient::InitSocketSignals() {
|
|
RTC_DCHECK(control_socket_.get() != NULL);
|
|
RTC_DCHECK(hanging_get_.get() != NULL);
|
|
control_socket_->SignalCloseEvent.connect(this,
|
|
&PeerConnectionClient::OnClose);
|
|
hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
|
|
control_socket_->SignalConnectEvent.connect(this,
|
|
&PeerConnectionClient::OnConnect);
|
|
hanging_get_->SignalConnectEvent.connect(
|
|
this, &PeerConnectionClient::OnHangingGetConnect);
|
|
control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
|
|
hanging_get_->SignalReadEvent.connect(
|
|
this, &PeerConnectionClient::OnHangingGetRead);
|
|
}
|
|
|
|
int PeerConnectionClient::id() const {
|
|
return my_id_;
|
|
}
|
|
|
|
bool PeerConnectionClient::is_connected() const {
|
|
return my_id_ != -1;
|
|
}
|
|
|
|
const Peers& PeerConnectionClient::peers() const {
|
|
return peers_;
|
|
}
|
|
|
|
void PeerConnectionClient::RegisterObserver(
|
|
PeerConnectionClientObserver* callback) {
|
|
RTC_DCHECK(!callback_);
|
|
callback_ = callback;
|
|
}
|
|
|
|
void PeerConnectionClient::Connect(const std::string& server,
|
|
int port,
|
|
const std::string& client_name) {
|
|
RTC_DCHECK(!server.empty());
|
|
RTC_DCHECK(!client_name.empty());
|
|
|
|
if (state_ != NOT_CONNECTED) {
|
|
RTC_LOG(WARNING)
|
|
<< "The client must not be connected before you can call Connect()";
|
|
callback_->OnServerConnectionFailure();
|
|
return;
|
|
}
|
|
|
|
if (server.empty() || client_name.empty()) {
|
|
callback_->OnServerConnectionFailure();
|
|
return;
|
|
}
|
|
|
|
if (port <= 0)
|
|
port = kDefaultServerPort;
|
|
|
|
server_address_.SetIP(server);
|
|
server_address_.SetPort(port);
|
|
client_name_ = client_name;
|
|
|
|
if (server_address_.IsUnresolvedIP()) {
|
|
state_ = RESOLVING;
|
|
resolver_ = new rtc::AsyncResolver();
|
|
resolver_->SignalDone.connect(this, &PeerConnectionClient::OnResolveResult);
|
|
resolver_->Start(server_address_);
|
|
} else {
|
|
DoConnect();
|
|
}
|
|
}
|
|
|
|
void PeerConnectionClient::OnResolveResult(
|
|
rtc::AsyncResolverInterface* resolver) {
|
|
if (resolver_->GetError() != 0) {
|
|
callback_->OnServerConnectionFailure();
|
|
resolver_->Destroy(false);
|
|
resolver_ = NULL;
|
|
state_ = NOT_CONNECTED;
|
|
} else {
|
|
server_address_ = resolver_->address();
|
|
DoConnect();
|
|
}
|
|
}
|
|
|
|
void PeerConnectionClient::DoConnect() {
|
|
control_socket_.reset(CreateClientSocket(server_address_.ipaddr().family()));
|
|
hanging_get_.reset(CreateClientSocket(server_address_.ipaddr().family()));
|
|
InitSocketSignals();
|
|
char buffer[1024];
|
|
snprintf(buffer, sizeof(buffer), "GET /sign_in?%s HTTP/1.0\r\n\r\n",
|
|
client_name_.c_str());
|
|
onconnect_data_ = buffer;
|
|
|
|
bool ret = ConnectControlSocket();
|
|
if (ret)
|
|
state_ = SIGNING_IN;
|
|
if (!ret) {
|
|
callback_->OnServerConnectionFailure();
|
|
}
|
|
}
|
|
|
|
bool PeerConnectionClient::SendToPeer(int peer_id, const std::string& message) {
|
|
if (state_ != CONNECTED)
|
|
return false;
|
|
|
|
RTC_DCHECK(is_connected());
|
|
RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
|
|
if (!is_connected() || peer_id == -1)
|
|
return false;
|
|
|
|
char headers[1024];
|
|
snprintf(headers, sizeof(headers),
|
|
"POST /message?peer_id=%i&to=%i HTTP/1.0\r\n"
|
|
"Content-Length: %zu\r\n"
|
|
"Content-Type: text/plain\r\n"
|
|
"\r\n",
|
|
my_id_, peer_id, message.length());
|
|
onconnect_data_ = headers;
|
|
onconnect_data_ += message;
|
|
return ConnectControlSocket();
|
|
}
|
|
|
|
bool PeerConnectionClient::SendHangUp(int peer_id) {
|
|
return SendToPeer(peer_id, kByeMessage);
|
|
}
|
|
|
|
bool PeerConnectionClient::IsSendingMessage() {
|
|
return state_ == CONNECTED &&
|
|
control_socket_->GetState() != rtc::Socket::CS_CLOSED;
|
|
}
|
|
|
|
bool PeerConnectionClient::SignOut() {
|
|
if (state_ == NOT_CONNECTED || state_ == SIGNING_OUT)
|
|
return true;
|
|
|
|
if (hanging_get_->GetState() != rtc::Socket::CS_CLOSED)
|
|
hanging_get_->Close();
|
|
|
|
if (control_socket_->GetState() == rtc::Socket::CS_CLOSED) {
|
|
state_ = SIGNING_OUT;
|
|
|
|
if (my_id_ != -1) {
|
|
char buffer[1024];
|
|
snprintf(buffer, sizeof(buffer),
|
|
"GET /sign_out?peer_id=%i HTTP/1.0\r\n\r\n", my_id_);
|
|
onconnect_data_ = buffer;
|
|
return ConnectControlSocket();
|
|
} else {
|
|
// Can occur if the app is closed before we finish connecting.
|
|
return true;
|
|
}
|
|
} else {
|
|
state_ = SIGNING_OUT_WAITING;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::Close() {
|
|
control_socket_->Close();
|
|
hanging_get_->Close();
|
|
onconnect_data_.clear();
|
|
peers_.clear();
|
|
if (resolver_ != NULL) {
|
|
resolver_->Destroy(false);
|
|
resolver_ = NULL;
|
|
}
|
|
my_id_ = -1;
|
|
state_ = NOT_CONNECTED;
|
|
}
|
|
|
|
bool PeerConnectionClient::ConnectControlSocket() {
|
|
RTC_DCHECK(control_socket_->GetState() == rtc::Socket::CS_CLOSED);
|
|
int err = control_socket_->Connect(server_address_);
|
|
if (err == SOCKET_ERROR) {
|
|
Close();
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::OnConnect(rtc::AsyncSocket* socket) {
|
|
RTC_DCHECK(!onconnect_data_.empty());
|
|
size_t sent = socket->Send(onconnect_data_.c_str(), onconnect_data_.length());
|
|
RTC_DCHECK(sent == onconnect_data_.length());
|
|
onconnect_data_.clear();
|
|
}
|
|
|
|
void PeerConnectionClient::OnHangingGetConnect(rtc::AsyncSocket* socket) {
|
|
char buffer[1024];
|
|
snprintf(buffer, sizeof(buffer), "GET /wait?peer_id=%i HTTP/1.0\r\n\r\n",
|
|
my_id_);
|
|
int len = static_cast<int>(strlen(buffer));
|
|
int sent = socket->Send(buffer, len);
|
|
RTC_DCHECK(sent == len);
|
|
}
|
|
|
|
void PeerConnectionClient::OnMessageFromPeer(int peer_id,
|
|
const std::string& message) {
|
|
if (message.length() == (sizeof(kByeMessage) - 1) &&
|
|
message.compare(kByeMessage) == 0) {
|
|
callback_->OnPeerDisconnected(peer_id);
|
|
} else {
|
|
callback_->OnMessageFromPeer(peer_id, message);
|
|
}
|
|
}
|
|
|
|
bool PeerConnectionClient::GetHeaderValue(const std::string& data,
|
|
size_t eoh,
|
|
const char* header_pattern,
|
|
size_t* value) {
|
|
RTC_DCHECK(value != NULL);
|
|
size_t found = data.find(header_pattern);
|
|
if (found != std::string::npos && found < eoh) {
|
|
*value = atoi(&data[found + strlen(header_pattern)]);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool PeerConnectionClient::GetHeaderValue(const std::string& data,
|
|
size_t eoh,
|
|
const char* header_pattern,
|
|
std::string* value) {
|
|
RTC_DCHECK(value != NULL);
|
|
size_t found = data.find(header_pattern);
|
|
if (found != std::string::npos && found < eoh) {
|
|
size_t begin = found + strlen(header_pattern);
|
|
size_t end = data.find("\r\n", begin);
|
|
if (end == std::string::npos)
|
|
end = eoh;
|
|
value->assign(data.substr(begin, end - begin));
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool PeerConnectionClient::ReadIntoBuffer(rtc::AsyncSocket* socket,
|
|
std::string* data,
|
|
size_t* content_length) {
|
|
char buffer[0xffff];
|
|
do {
|
|
int bytes = socket->Recv(buffer, sizeof(buffer), nullptr);
|
|
if (bytes <= 0)
|
|
break;
|
|
data->append(buffer, bytes);
|
|
} while (true);
|
|
|
|
bool ret = false;
|
|
size_t i = data->find("\r\n\r\n");
|
|
if (i != std::string::npos) {
|
|
RTC_LOG(INFO) << "Headers received";
|
|
if (GetHeaderValue(*data, i, "\r\nContent-Length: ", content_length)) {
|
|
size_t total_response_size = (i + 4) + *content_length;
|
|
if (data->length() >= total_response_size) {
|
|
ret = true;
|
|
std::string should_close;
|
|
const char kConnection[] = "\r\nConnection: ";
|
|
if (GetHeaderValue(*data, i, kConnection, &should_close) &&
|
|
should_close.compare("close") == 0) {
|
|
socket->Close();
|
|
// Since we closed the socket, there was no notification delivered
|
|
// to us. Compensate by letting ourselves know.
|
|
OnClose(socket, 0);
|
|
}
|
|
} else {
|
|
// We haven't received everything. Just continue to accept data.
|
|
}
|
|
} else {
|
|
RTC_LOG(LS_ERROR) << "No content length field specified by the server.";
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void PeerConnectionClient::OnRead(rtc::AsyncSocket* socket) {
|
|
size_t content_length = 0;
|
|
if (ReadIntoBuffer(socket, &control_data_, &content_length)) {
|
|
size_t peer_id = 0, eoh = 0;
|
|
bool ok =
|
|
ParseServerResponse(control_data_, content_length, &peer_id, &eoh);
|
|
if (ok) {
|
|
if (my_id_ == -1) {
|
|
// First response. Let's store our server assigned ID.
|
|
RTC_DCHECK(state_ == SIGNING_IN);
|
|
my_id_ = static_cast<int>(peer_id);
|
|
RTC_DCHECK(my_id_ != -1);
|
|
|
|
// The body of the response will be a list of already connected peers.
|
|
if (content_length) {
|
|
size_t pos = eoh + 4;
|
|
while (pos < control_data_.size()) {
|
|
size_t eol = control_data_.find('\n', pos);
|
|
if (eol == std::string::npos)
|
|
break;
|
|
int id = 0;
|
|
std::string name;
|
|
bool connected;
|
|
if (ParseEntry(control_data_.substr(pos, eol - pos), &name, &id,
|
|
&connected) &&
|
|
id != my_id_) {
|
|
peers_[id] = name;
|
|
callback_->OnPeerConnected(id, name);
|
|
}
|
|
pos = eol + 1;
|
|
}
|
|
}
|
|
RTC_DCHECK(is_connected());
|
|
callback_->OnSignedIn();
|
|
} else if (state_ == SIGNING_OUT) {
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
} else if (state_ == SIGNING_OUT_WAITING) {
|
|
SignOut();
|
|
}
|
|
}
|
|
|
|
control_data_.clear();
|
|
|
|
if (state_ == SIGNING_IN) {
|
|
RTC_DCHECK(hanging_get_->GetState() == rtc::Socket::CS_CLOSED);
|
|
state_ = CONNECTED;
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
}
|
|
}
|
|
|
|
void PeerConnectionClient::OnHangingGetRead(rtc::AsyncSocket* socket) {
|
|
RTC_LOG(INFO) << __FUNCTION__;
|
|
size_t content_length = 0;
|
|
if (ReadIntoBuffer(socket, ¬ification_data_, &content_length)) {
|
|
size_t peer_id = 0, eoh = 0;
|
|
bool ok =
|
|
ParseServerResponse(notification_data_, content_length, &peer_id, &eoh);
|
|
|
|
if (ok) {
|
|
// Store the position where the body begins.
|
|
size_t pos = eoh + 4;
|
|
|
|
if (my_id_ == static_cast<int>(peer_id)) {
|
|
// A notification about a new member or a member that just
|
|
// disconnected.
|
|
int id = 0;
|
|
std::string name;
|
|
bool connected = false;
|
|
if (ParseEntry(notification_data_.substr(pos), &name, &id,
|
|
&connected)) {
|
|
if (connected) {
|
|
peers_[id] = name;
|
|
callback_->OnPeerConnected(id, name);
|
|
} else {
|
|
peers_.erase(id);
|
|
callback_->OnPeerDisconnected(id);
|
|
}
|
|
}
|
|
} else {
|
|
OnMessageFromPeer(static_cast<int>(peer_id),
|
|
notification_data_.substr(pos));
|
|
}
|
|
}
|
|
|
|
notification_data_.clear();
|
|
}
|
|
|
|
if (hanging_get_->GetState() == rtc::Socket::CS_CLOSED &&
|
|
state_ == CONNECTED) {
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
}
|
|
|
|
bool PeerConnectionClient::ParseEntry(const std::string& entry,
|
|
std::string* name,
|
|
int* id,
|
|
bool* connected) {
|
|
RTC_DCHECK(name != NULL);
|
|
RTC_DCHECK(id != NULL);
|
|
RTC_DCHECK(connected != NULL);
|
|
RTC_DCHECK(!entry.empty());
|
|
|
|
*connected = false;
|
|
size_t separator = entry.find(',');
|
|
if (separator != std::string::npos) {
|
|
*id = atoi(&entry[separator + 1]);
|
|
name->assign(entry.substr(0, separator));
|
|
separator = entry.find(',', separator + 1);
|
|
if (separator != std::string::npos) {
|
|
*connected = atoi(&entry[separator + 1]) ? true : false;
|
|
}
|
|
}
|
|
return !name->empty();
|
|
}
|
|
|
|
int PeerConnectionClient::GetResponseStatus(const std::string& response) {
|
|
int status = -1;
|
|
size_t pos = response.find(' ');
|
|
if (pos != std::string::npos)
|
|
status = atoi(&response[pos + 1]);
|
|
return status;
|
|
}
|
|
|
|
bool PeerConnectionClient::ParseServerResponse(const std::string& response,
|
|
size_t content_length,
|
|
size_t* peer_id,
|
|
size_t* eoh) {
|
|
int status = GetResponseStatus(response.c_str());
|
|
if (status != 200) {
|
|
RTC_LOG(LS_ERROR) << "Received error from server";
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
return false;
|
|
}
|
|
|
|
*eoh = response.find("\r\n\r\n");
|
|
RTC_DCHECK(*eoh != std::string::npos);
|
|
if (*eoh == std::string::npos)
|
|
return false;
|
|
|
|
*peer_id = -1;
|
|
|
|
// See comment in peer_channel.cc for why we use the Pragma header and
|
|
// not e.g. "X-Peer-Id".
|
|
GetHeaderValue(response, *eoh, "\r\nPragma: ", peer_id);
|
|
|
|
return true;
|
|
}
|
|
|
|
void PeerConnectionClient::OnClose(rtc::AsyncSocket* socket, int err) {
|
|
RTC_LOG(INFO) << __FUNCTION__;
|
|
|
|
socket->Close();
|
|
|
|
#ifdef WIN32
|
|
if (err != WSAECONNREFUSED) {
|
|
#else
|
|
if (err != ECONNREFUSED) {
|
|
#endif
|
|
if (socket == hanging_get_.get()) {
|
|
if (state_ == CONNECTED) {
|
|
hanging_get_->Close();
|
|
hanging_get_->Connect(server_address_);
|
|
}
|
|
} else {
|
|
callback_->OnMessageSent(err);
|
|
}
|
|
} else {
|
|
if (socket == control_socket_.get()) {
|
|
RTC_LOG(WARNING) << "Connection refused; retrying in 2 seconds";
|
|
rtc::Thread::Current()->PostDelayed(RTC_FROM_HERE, kReconnectDelay, this,
|
|
0);
|
|
} else {
|
|
Close();
|
|
callback_->OnDisconnected();
|
|
}
|
|
}
|
|
}
|
|
|
|
void PeerConnectionClient::OnMessage(rtc::Message* msg) {
|
|
// ignore msg; there is currently only one supported message ("retry")
|
|
DoConnect();
|
|
}
|