Revert "Port: migrate to TaskQueue."
This reverts commit 06540166ca97028454adea48cec9bf109b771ddc. Reason for revert: breaks downstream test. Original change's description: > Port: migrate to TaskQueue. > > Port uses legacy rtc::Thread message handling. In order > to cancel callbacks it uses rtc::Thread::Clear() which uses locks and > necessitates looping through all currently queued (unbounded) messages > in the thread. In particular, these Clear calls are common during > negotiation and the probability of having a lot of queued messages is > high due to a long-running network thread function invoked on the > network thread. > > Fix this by migrating Port to task queues. > > > Bug: webrtc:12840, webrtc:9702 > Change-Id: I6c6fb83323899b56091f0857a1c2d15d19199002 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221370 > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Commit-Queue: Markus Handell <handellm@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#34338} TBR=hta@webrtc.org,handellm@webrtc.org,webrtc-scoped@luci-project-accounts.iam.gserviceaccount.com Change-Id: I014ef9267d224c10595cfa1c12899eabe0093306 No-Presubmit: true No-Tree-Checks: true No-Try: true Bug: webrtc:12840, webrtc:9702 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/223062 Reviewed-by: Markus Handell <handellm@webrtc.org> Commit-Queue: Markus Handell <handellm@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34339}
This commit is contained in:

committed by
WebRTC LUCI CQ

parent
06540166ca
commit
a4aabb9213
@ -32,7 +32,6 @@
|
|||||||
#include "rtc_base/string_encode.h"
|
#include "rtc_base/string_encode.h"
|
||||||
#include "rtc_base/string_utils.h"
|
#include "rtc_base/string_utils.h"
|
||||||
#include "rtc_base/strings/string_builder.h"
|
#include "rtc_base/strings/string_builder.h"
|
||||||
#include "rtc_base/task_utils/to_queued_task.h"
|
|
||||||
#include "rtc_base/third_party/base64/base64.h"
|
#include "rtc_base/third_party/base64/base64.h"
|
||||||
#include "rtc_base/trace_event.h"
|
#include "rtc_base/trace_event.h"
|
||||||
#include "system_wrappers/include/field_trial.h"
|
#include "system_wrappers/include/field_trial.h"
|
||||||
@ -174,13 +173,15 @@ void Port::Construct() {
|
|||||||
network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged);
|
network_->SignalTypeChanged.connect(this, &Port::OnNetworkTypeChanged);
|
||||||
network_cost_ = network_->GetCost();
|
network_cost_ = network_->GetCost();
|
||||||
|
|
||||||
ScheduleDelayedDestructionIfDead();
|
thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this,
|
||||||
|
MSG_DESTROY_IF_DEAD);
|
||||||
RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost "
|
RTC_LOG(LS_INFO) << ToString() << ": Port created with network cost "
|
||||||
<< network_cost_;
|
<< network_cost_;
|
||||||
}
|
}
|
||||||
|
|
||||||
Port::~Port() {
|
Port::~Port() {
|
||||||
RTC_DCHECK_RUN_ON(thread_);
|
RTC_DCHECK_RUN_ON(thread_);
|
||||||
|
CancelPendingTasks();
|
||||||
|
|
||||||
// Delete all of the remaining connections. We copy the list up front
|
// Delete all of the remaining connections. We copy the list up front
|
||||||
// because each deletion will cause it to be modified.
|
// because each deletion will cause it to be modified.
|
||||||
@ -821,11 +822,19 @@ void Port::KeepAliveUntilPruned() {
|
|||||||
|
|
||||||
void Port::Prune() {
|
void Port::Prune() {
|
||||||
state_ = State::PRUNED;
|
state_ = State::PRUNED;
|
||||||
thread_->PostTask(webrtc::ToQueuedTask(safety_, [this] { DestroyIfDead(); }));
|
thread_->Post(RTC_FROM_HERE, this, MSG_DESTROY_IF_DEAD);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Port::DestroyIfDead() {
|
// Call to stop any currently pending operations from running.
|
||||||
|
void Port::CancelPendingTasks() {
|
||||||
|
TRACE_EVENT0("webrtc", "Port::CancelPendingTasks");
|
||||||
RTC_DCHECK_RUN_ON(thread_);
|
RTC_DCHECK_RUN_ON(thread_);
|
||||||
|
thread_->Clear(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Port::OnMessage(rtc::Message* pmsg) {
|
||||||
|
RTC_DCHECK_RUN_ON(thread_);
|
||||||
|
RTC_DCHECK(pmsg->message_id == MSG_DESTROY_IF_DEAD);
|
||||||
bool dead =
|
bool dead =
|
||||||
(state_ == State::INIT || state_ == State::PRUNED) &&
|
(state_ == State::INIT || state_ == State::PRUNED) &&
|
||||||
connections_.empty() &&
|
connections_.empty() &&
|
||||||
@ -849,12 +858,6 @@ void Port::OnNetworkTypeChanged(const rtc::Network* network) {
|
|||||||
UpdateNetworkCost();
|
UpdateNetworkCost();
|
||||||
}
|
}
|
||||||
|
|
||||||
void Port::ScheduleDelayedDestructionIfDead() {
|
|
||||||
thread_->PostDelayedTask(
|
|
||||||
webrtc::ToQueuedTask(safety_, [this] { DestroyIfDead(); }),
|
|
||||||
timeout_delay_);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::string Port::ToString() const {
|
std::string Port::ToString() const {
|
||||||
rtc::StringBuilder ss;
|
rtc::StringBuilder ss;
|
||||||
ss << "Port[" << rtc::ToHex(reinterpret_cast<uintptr_t>(this)) << ":"
|
ss << "Port[" << rtc::ToHex(reinterpret_cast<uintptr_t>(this)) << ":"
|
||||||
@ -905,7 +908,8 @@ void Port::OnConnectionDestroyed(Connection* conn) {
|
|||||||
// not cause the Port to be destroyed.
|
// not cause the Port to be destroyed.
|
||||||
if (connections_.empty()) {
|
if (connections_.empty()) {
|
||||||
last_time_all_connections_removed_ = rtc::TimeMillis();
|
last_time_all_connections_removed_ = rtc::TimeMillis();
|
||||||
ScheduleDelayedDestructionIfDead();
|
thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this,
|
||||||
|
MSG_DESTROY_IF_DEAD);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +41,6 @@
|
|||||||
#include "rtc_base/rate_tracker.h"
|
#include "rtc_base/rate_tracker.h"
|
||||||
#include "rtc_base/socket_address.h"
|
#include "rtc_base/socket_address.h"
|
||||||
#include "rtc_base/system/rtc_export.h"
|
#include "rtc_base/system/rtc_export.h"
|
||||||
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
|
||||||
#include "rtc_base/third_party/sigslot/sigslot.h"
|
#include "rtc_base/third_party/sigslot/sigslot.h"
|
||||||
#include "rtc_base/thread.h"
|
#include "rtc_base/thread.h"
|
||||||
#include "rtc_base/weak_ptr.h"
|
#include "rtc_base/weak_ptr.h"
|
||||||
@ -172,6 +171,7 @@ typedef std::set<rtc::SocketAddress> ServerAddresses;
|
|||||||
// connections to similar mechanisms of the other client. Subclasses of this
|
// connections to similar mechanisms of the other client. Subclasses of this
|
||||||
// one add support for specific mechanisms like local UDP ports.
|
// one add support for specific mechanisms like local UDP ports.
|
||||||
class Port : public PortInterface,
|
class Port : public PortInterface,
|
||||||
|
public rtc::MessageHandler,
|
||||||
public sigslot::has_slots<> {
|
public sigslot::has_slots<> {
|
||||||
public:
|
public:
|
||||||
// INIT: The state when a port is just created.
|
// INIT: The state when a port is just created.
|
||||||
@ -220,6 +220,9 @@ class Port : public PortInterface,
|
|||||||
// Allows a port to be destroyed if no connection is using it.
|
// Allows a port to be destroyed if no connection is using it.
|
||||||
void Prune();
|
void Prune();
|
||||||
|
|
||||||
|
// Call to stop any currently pending operations from running.
|
||||||
|
void CancelPendingTasks();
|
||||||
|
|
||||||
// The thread on which this port performs its I/O.
|
// The thread on which this port performs its I/O.
|
||||||
rtc::Thread* thread() { return thread_; }
|
rtc::Thread* thread() { return thread_; }
|
||||||
|
|
||||||
@ -325,6 +328,8 @@ class Port : public PortInterface,
|
|||||||
// Called if the port has no connections and is no longer useful.
|
// Called if the port has no connections and is no longer useful.
|
||||||
void Destroy();
|
void Destroy();
|
||||||
|
|
||||||
|
void OnMessage(rtc::Message* pmsg) override;
|
||||||
|
|
||||||
// Debugging description of this port
|
// Debugging description of this port
|
||||||
std::string ToString() const override;
|
std::string ToString() const override;
|
||||||
uint16_t min_port() { return min_port_; }
|
uint16_t min_port() { return min_port_; }
|
||||||
@ -375,6 +380,8 @@ class Port : public PortInterface,
|
|||||||
const rtc::SocketAddress& base_address);
|
const rtc::SocketAddress& base_address);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
enum { MSG_DESTROY_IF_DEAD = 0, MSG_FIRST_AVAILABLE };
|
||||||
|
|
||||||
virtual void UpdateNetworkCost();
|
virtual void UpdateNetworkCost();
|
||||||
|
|
||||||
void set_type(const std::string& type) { type_ = type; }
|
void set_type(const std::string& type) { type_ = type; }
|
||||||
@ -441,9 +448,8 @@ class Port : public PortInterface,
|
|||||||
void Construct();
|
void Construct();
|
||||||
// Called when one of our connections deletes itself.
|
// Called when one of our connections deletes itself.
|
||||||
void OnConnectionDestroyed(Connection* conn);
|
void OnConnectionDestroyed(Connection* conn);
|
||||||
|
|
||||||
void OnNetworkTypeChanged(const rtc::Network* network);
|
void OnNetworkTypeChanged(const rtc::Network* network);
|
||||||
void ScheduleDelayedDestructionIfDead();
|
|
||||||
void DestroyIfDead();
|
|
||||||
|
|
||||||
rtc::Thread* const thread_;
|
rtc::Thread* const thread_;
|
||||||
rtc::PacketSocketFactory* const factory_;
|
rtc::PacketSocketFactory* const factory_;
|
||||||
@ -493,7 +499,6 @@ class Port : public PortInterface,
|
|||||||
|
|
||||||
friend class Connection;
|
friend class Connection;
|
||||||
webrtc::CallbackList<PortInterface*> port_destroyed_callback_list_;
|
webrtc::CallbackList<PortInterface*> port_destroyed_callback_list_;
|
||||||
webrtc::ScopedTaskSafety safety_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace cricket
|
} // namespace cricket
|
||||||
|
@ -990,7 +990,7 @@ void TurnPort::OnMessage(rtc::Message* message) {
|
|||||||
Close();
|
Close();
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
RTC_NOTREACHED();
|
Port::OnMessage(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,6 @@
|
|||||||
#include "p2p/client/basic_port_allocator.h"
|
#include "p2p/client/basic_port_allocator.h"
|
||||||
#include "rtc_base/async_packet_socket.h"
|
#include "rtc_base/async_packet_socket.h"
|
||||||
#include "rtc_base/async_resolver_interface.h"
|
#include "rtc_base/async_resolver_interface.h"
|
||||||
#include "rtc_base/message_handler.h"
|
|
||||||
#include "rtc_base/ssl_certificate.h"
|
#include "rtc_base/ssl_certificate.h"
|
||||||
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
#include "rtc_base/task_utils/pending_task_safety_flag.h"
|
||||||
|
|
||||||
@ -42,7 +41,7 @@ extern const char TURN_PORT_TYPE[];
|
|||||||
class TurnAllocateRequest;
|
class TurnAllocateRequest;
|
||||||
class TurnEntry;
|
class TurnEntry;
|
||||||
|
|
||||||
class TurnPort : public Port, public rtc::MessageHandler {
|
class TurnPort : public Port {
|
||||||
public:
|
public:
|
||||||
enum PortState {
|
enum PortState {
|
||||||
STATE_CONNECTING, // Initial state, cannot send any packets.
|
STATE_CONNECTING, // Initial state, cannot send any packets.
|
||||||
@ -299,7 +298,7 @@ class TurnPort : public Port, public rtc::MessageHandler {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
enum {
|
enum {
|
||||||
MSG_ALLOCATE_ERROR,
|
MSG_ALLOCATE_ERROR = MSG_FIRST_AVAILABLE,
|
||||||
MSG_ALLOCATE_MISMATCH,
|
MSG_ALLOCATE_MISMATCH,
|
||||||
MSG_TRY_ALTERNATE_SERVER,
|
MSG_TRY_ALTERNATE_SERVER,
|
||||||
MSG_REFRESH_ERROR,
|
MSG_REFRESH_ERROR,
|
||||||
|
Reference in New Issue
Block a user