AllocationSequence: migrate from rtc::Message to TaskQueue.

AllocationSequence uses legacy rtc::Thread message handling. In order
to cancel callbacks it uses rtc::Thread::Clear() which uses locks and
necessitates looping through all currently queued (unbounded) messages
in the thread. In particular, these Clear calls are common during
negotiation and the probability of having a lot of queued messages is
high due to a long-running network thread function invoked on the
network thread.

Fix this by migrating AllocationSequence to task queues.

Bug: webrtc:12840, webrtc:9702
Change-Id: I42bbdb59fb2c88b50e866326ba15134dcc6ce691
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/221369
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34241}
This commit is contained in:
Markus Handell
2021-06-07 13:41:15 +02:00
committed by WebRTC LUCI CQ
parent dedcdfeba3
commit da8a45fdaa
2 changed files with 25 additions and 25 deletions

View File

@ -39,10 +39,6 @@ using rtc::CreateRandomId;
namespace cricket {
namespace {
enum {
MSG_ALLOCATION_PHASE,
};
const int PHASE_UDP = 0;
const int PHASE_RELAY = 1;
const int PHASE_TCP = 2;
@ -1261,10 +1257,6 @@ void AllocationSequence::OnNetworkFailed() {
Stop();
}
AllocationSequence::~AllocationSequence() {
session_->network_thread()->Clear(this);
}
void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
PortConfiguration* config,
uint32_t* flags) {
@ -1339,7 +1331,9 @@ void AllocationSequence::DisableEquivalentPhases(rtc::Network* network,
void AllocationSequence::Start() {
state_ = kRunning;
session_->network_thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATION_PHASE);
session_->network_thread()->PostTask(webrtc::ToQueuedTask(
safety_, [this, epoch = epoch_] { Process(epoch); }));
// Take a snapshot of the best IP, so that when DisableEquivalentPhases is
// called next time, we enable all phases if the best IP has since changed.
previous_best_ip_ = network_->GetBestIP();
@ -1349,16 +1343,18 @@ void AllocationSequence::Stop() {
// If the port is completed, don't set it to stopped.
if (state_ == kRunning) {
state_ = kStopped;
session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
// Cause further Process calls in the previous epoch to be ignored.
++epoch_;
}
}
void AllocationSequence::OnMessage(rtc::Message* msg) {
void AllocationSequence::Process(int epoch) {
RTC_DCHECK(rtc::Thread::Current() == session_->network_thread());
RTC_DCHECK(msg->message_id == MSG_ALLOCATION_PHASE);
const char* const PHASE_NAMES[kNumPhases] = {"Udp", "Relay", "Tcp"};
if (epoch != epoch_)
return;
// Perform all of the phases in the current step.
RTC_LOG(LS_INFO) << network_->ToString()
<< ": Allocation Phase=" << PHASE_NAMES[phase_];
@ -1384,13 +1380,15 @@ void AllocationSequence::OnMessage(rtc::Message* msg) {
if (state() == kRunning) {
++phase_;
session_->network_thread()->PostDelayed(RTC_FROM_HERE,
session_->allocator()->step_delay(),
this, MSG_ALLOCATION_PHASE);
session_->network_thread()->PostDelayedTask(
webrtc::ToQueuedTask(safety_,
[this, epoch = epoch_] { Process(epoch); }),
session_->allocator()->step_delay());
} else {
// If all phases in AllocationSequence are completed, no allocation
// steps needed further. Canceling pending signal.
session_->network_thread()->Clear(this, MSG_ALLOCATION_PHASE);
// No allocation steps needed further if all phases in AllocationSequence
// are completed. Cause further Process calls in the previous epoch to be
// ignored.
++epoch_;
port_allocation_complete_callback_();
}
}

View File

@ -327,8 +327,8 @@ class TurnPort;
// Performs the allocation of ports, in a sequenced (timed) manner, for a given
// network and IP address.
class AllocationSequence : public rtc::MessageHandler,
public sigslot::has_slots<> {
// This class is thread-compatible.
class AllocationSequence : public sigslot::has_slots<> {
public:
enum State {
kInit, // Initial state.
@ -350,7 +350,6 @@ class AllocationSequence : public rtc::MessageHandler,
PortConfiguration* config,
uint32_t flags,
std::function<void()> port_allocation_complete_callback);
~AllocationSequence() override;
void Init();
void Clear();
void OnNetworkFailed();
@ -372,9 +371,6 @@ class AllocationSequence : public rtc::MessageHandler,
void Start();
void Stop();
// MessageHandler
void OnMessage(rtc::Message* msg) override;
protected:
// For testing.
void CreateTurnPort(const RelayServerConfig& config);
@ -382,6 +378,7 @@ class AllocationSequence : public rtc::MessageHandler,
private:
typedef std::vector<ProtocolType> ProtocolList;
void Process(int epoch);
bool IsFlagSet(uint32_t flag) { return ((flags_ & flag) != 0); }
void CreateUDPPorts();
void CreateTCPPorts();
@ -411,6 +408,11 @@ class AllocationSequence : public rtc::MessageHandler,
std::vector<Port*> relay_ports_;
int phase_;
std::function<void()> port_allocation_complete_callback_;
// This counter is sampled and passed together with tasks when tasks are
// posted. If the sampled counter doesn't match |epoch_| on reception, the
// posted task is ignored.
int epoch_ = 0;
webrtc::ScopedTaskSafety safety_;
};
} // namespace cricket