Use std::function instead of sigslot for sending packets

Bug: webrtc:11943
Change-Id: I2df9908f5e2e2ded1f2c6fbf50ef415f73760b50
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/258787
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36541}
This commit is contained in:
Tomas Gunnarsson
2022-04-13 09:07:30 +00:00
committed by WebRTC LUCI CQ
parent f15189dbce
commit f22dfdddfe
6 changed files with 49 additions and 26 deletions

View File

@ -300,7 +300,10 @@ Connection::Connection(rtc::WeakPtr<Port> port,
connected_(true), connected_(true),
pruned_(false), pruned_(false),
use_candidate_attr_(false), use_candidate_attr_(false),
requests_(port_->thread()), requests_(port_->thread(),
[this](const void* data, size_t size, StunRequest* request) {
OnSendStunPacket(data, size, request);
}),
rtt_(DEFAULT_RTT), rtt_(DEFAULT_RTT),
last_ping_sent_(0), last_ping_sent_(0),
last_ping_received_(0), last_ping_received_(0),
@ -312,10 +315,6 @@ Connection::Connection(rtc::WeakPtr<Port> port,
field_trials_(&kDefaultFieldTrials), field_trials_(&kDefaultFieldTrials),
rtt_estimate_(DEFAULT_RTT_ESTIMATE_HALF_TIME_MS) { rtt_estimate_(DEFAULT_RTT_ESTIMATE_HALF_TIME_MS) {
RTC_DCHECK_RUN_ON(network_thread_); RTC_DCHECK_RUN_ON(network_thread_);
// All of our connections start in WAITING state.
// TODO(mallinath) - Start connections from STATE_FROZEN.
// Wire up to send stun packets
requests_.SignalSendPacket.connect(this, &Connection::OnSendStunPacket);
RTC_LOG(LS_INFO) << ToString() << ": Connection created"; RTC_LOG(LS_INFO) << ToString() << ": Connection created";
} }

View File

@ -169,7 +169,11 @@ UDPPort::UDPPort(rtc::Thread* thread,
username, username,
password, password,
field_trials), field_trials),
request_manager_(thread), request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendPacket(data, size, request);
}),
socket_(socket), socket_(socket),
error_(0), error_(0),
ready_(false), ready_(false),
@ -195,7 +199,11 @@ UDPPort::UDPPort(rtc::Thread* thread,
username, username,
password, password,
field_trials), field_trials),
request_manager_(thread), request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendPacket(data, size, request);
}),
socket_(nullptr), socket_(nullptr),
error_(0), error_(0),
ready_(false), ready_(false),
@ -218,7 +226,6 @@ bool UDPPort::Init() {
socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket); socket_->SignalSentPacket.connect(this, &UDPPort::OnSentPacket);
socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend); socket_->SignalReadyToSend.connect(this, &UDPPort::OnReadyToSend);
socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady); socket_->SignalAddressReady.connect(this, &UDPPort::OnLocalAddressReady);
request_manager_.SignalSendPacket.connect(this, &UDPPort::OnSendPacket);
return true; return true;
} }

View File

@ -43,7 +43,10 @@ const int STUN_MAX_RETRANSMISSIONS = 8; // Total sends: 9
// work well. // work well.
const int STUN_MAX_RTO = 8000; // milliseconds, or 5 doublings const int STUN_MAX_RTO = 8000; // milliseconds, or 5 doublings
StunRequestManager::StunRequestManager(rtc::Thread* thread) : thread_(thread) {} StunRequestManager::StunRequestManager(
rtc::Thread* thread,
std::function<void(const void*, size_t, StunRequest*)> send_packet)
: thread_(thread), send_packet_(std::move(send_packet)) {}
StunRequestManager::~StunRequestManager() = default; StunRequestManager::~StunRequestManager() = default;
@ -181,6 +184,13 @@ void StunRequestManager::OnRequestTimedOut(StunRequest* request) {
requests_.erase(request->id()); requests_.erase(request->id());
} }
void StunRequestManager::SendPacket(const void* data,
size_t size,
StunRequest* request) {
RTC_DCHECK_EQ(this, request->manager());
send_packet_(data, size, request);
}
StunRequest::StunRequest(StunRequestManager& manager) StunRequest::StunRequest(StunRequestManager& manager)
: manager_(manager), : manager_(manager),
msg_(new StunMessage()), msg_(new StunMessage()),
@ -239,7 +249,7 @@ void StunRequest::OnMessage(rtc::Message* pmsg) {
rtc::ByteBufferWriter buf; rtc::ByteBufferWriter buf;
msg_->Write(&buf); msg_->Write(&buf);
manager_.SignalSendPacket(buf.Data(), buf.Length(), this); manager_.SendPacket(buf.Data(), buf.Length(), this);
OnSent(); OnSent();
manager_.network_thread()->PostDelayed(RTC_FROM_HERE, resend_delay(), this, manager_.network_thread()->PostDelayed(RTC_FROM_HERE, resend_delay(), this,

View File

@ -14,13 +14,13 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <functional>
#include <map> #include <map>
#include <memory> #include <memory>
#include <string> #include <string>
#include "api/transport/stun.h" #include "api/transport/stun.h"
#include "rtc_base/message_handler.h" #include "rtc_base/message_handler.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h" #include "rtc_base/thread.h"
namespace cricket { namespace cricket {
@ -38,7 +38,9 @@ const int STUN_TOTAL_TIMEOUT = 39750; // milliseconds
// response or determine that the request has timed out. // response or determine that the request has timed out.
class StunRequestManager { class StunRequestManager {
public: public:
explicit StunRequestManager(rtc::Thread* thread); StunRequestManager(
rtc::Thread* thread,
std::function<void(const void*, size_t, StunRequest*)> send_packet);
~StunRequestManager(); ~StunRequestManager();
// Starts sending the given request (perhaps after a delay). // Starts sending the given request (perhaps after a delay).
@ -70,14 +72,14 @@ class StunRequestManager {
// TODO(tommi): Use TaskQueueBase* instead of rtc::Thread. // TODO(tommi): Use TaskQueueBase* instead of rtc::Thread.
rtc::Thread* network_thread() const { return thread_; } rtc::Thread* network_thread() const { return thread_; }
// Raised when there are bytes to be sent. void SendPacket(const void* data, size_t size, StunRequest* request);
sigslot::signal3<const void*, size_t, StunRequest*> SignalSendPacket;
private: private:
typedef std::map<std::string, std::unique_ptr<StunRequest>> RequestMap; typedef std::map<std::string, std::unique_ptr<StunRequest>> RequestMap;
rtc::Thread* const thread_; rtc::Thread* const thread_;
RequestMap requests_ RTC_GUARDED_BY(thread_); RequestMap requests_ RTC_GUARDED_BY(thread_);
const std::function<void(const void*, size_t, StunRequest*)> send_packet_;
}; };
// Represents an individual request to be sent. The STUN message can either be // Represents an individual request to be sent. The STUN message can either be

View File

@ -42,14 +42,15 @@ int TotalDelay(int sends) {
class StunRequestTest : public ::testing::Test, public sigslot::has_slots<> { class StunRequestTest : public ::testing::Test, public sigslot::has_slots<> {
public: public:
StunRequestTest() StunRequestTest()
: manager_(rtc::Thread::Current()), : manager_(rtc::Thread::Current(),
[this](const void* data, size_t size, StunRequest* request) {
OnSendPacket(data, size, request);
}),
request_count_(0), request_count_(0),
response_(NULL), response_(NULL),
success_(false), success_(false),
failure_(false), failure_(false),
timeout_(false) { timeout_(false) {}
manager_.SignalSendPacket.connect(this, &StunRequestTest::OnSendPacket);
}
void OnSendPacket(const void* data, size_t size, StunRequest* req) { void OnSendPacket(const void* data, size_t size, StunRequest* req) {
request_count_++; request_count_++;

View File

@ -242,15 +242,17 @@ TurnPort::TurnPort(rtc::Thread* thread,
socket_(socket), socket_(socket),
error_(0), error_(0),
stun_dscp_value_(rtc::DSCP_NO_CHANGE), stun_dscp_value_(rtc::DSCP_NO_CHANGE),
request_manager_(thread), request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendStunPacket(data, size, request);
}),
next_channel_number_(TURN_CHANNEL_NUMBER_START), next_channel_number_(TURN_CHANNEL_NUMBER_START),
state_(STATE_CONNECTING), state_(STATE_CONNECTING),
server_priority_(server_priority), server_priority_(server_priority),
allocate_mismatch_retries_(0), allocate_mismatch_retries_(0),
turn_customizer_(customizer), turn_customizer_(customizer),
field_trials_(field_trials) { field_trials_(field_trials) {}
request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket);
}
TurnPort::TurnPort(rtc::Thread* thread, TurnPort::TurnPort(rtc::Thread* thread,
rtc::PacketSocketFactory* factory, rtc::PacketSocketFactory* factory,
@ -284,15 +286,17 @@ TurnPort::TurnPort(rtc::Thread* thread,
socket_(NULL), socket_(NULL),
error_(0), error_(0),
stun_dscp_value_(rtc::DSCP_NO_CHANGE), stun_dscp_value_(rtc::DSCP_NO_CHANGE),
request_manager_(thread), request_manager_(
thread,
[this](const void* data, size_t size, StunRequest* request) {
OnSendStunPacket(data, size, request);
}),
next_channel_number_(TURN_CHANNEL_NUMBER_START), next_channel_number_(TURN_CHANNEL_NUMBER_START),
state_(STATE_CONNECTING), state_(STATE_CONNECTING),
server_priority_(server_priority), server_priority_(server_priority),
allocate_mismatch_retries_(0), allocate_mismatch_retries_(0),
turn_customizer_(customizer), turn_customizer_(customizer),
field_trials_(field_trials) { field_trials_(field_trials) {}
request_manager_.SignalSendPacket.connect(this, &TurnPort::OnSendStunPacket);
}
TurnPort::~TurnPort() { TurnPort::~TurnPort() {
// TODO(juberti): Should this even be necessary? // TODO(juberti): Should this even be necessary?