Support epoll in PhysicalSocketServer.
Only will be used if WEBRTC_POSIX and WEBRTC_LINUX are both defined and "epoll_create" doesn't return an error. Otherwise the default "select"-based IO loop will be used. BUG=webrtc:7585 Review-Url: https://codereview.webrtc.org/2880923002 Cr-Commit-Position: refs/heads/master@{#18359}
This commit is contained in:
@ -21,6 +21,10 @@
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
// "poll" will be used to wait for the signal dispatcher.
|
||||
#include <poll.h>
|
||||
#endif
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/select.h>
|
||||
@ -80,6 +84,16 @@ int64_t GetSocketRecvTimestamp(int socket) {
|
||||
typedef char* SockOptArg;
|
||||
#endif
|
||||
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
// POLLRDHUP / EPOLLRDHUP are only defined starting with Linux 2.6.17.
|
||||
#if !defined(POLLRDHUP)
|
||||
#define POLLRDHUP 0x2000
|
||||
#endif
|
||||
#if !defined(EPOLLRDHUP)
|
||||
#define EPOLLRDHUP 0x2000
|
||||
#endif
|
||||
#endif
|
||||
|
||||
namespace rtc {
|
||||
|
||||
std::unique_ptr<SocketServer> SocketServer::CreateDefault() {
|
||||
@ -774,6 +788,14 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
||||
#elif defined(WEBRTC_POSIX)
|
||||
|
||||
void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
// Remember currently enabled events so we can combine multiple changes
|
||||
// into one update call later.
|
||||
// The signal handlers might re-enable events disabled here, so we can't
|
||||
// keep a list of events to disable at the end of the method. This list
|
||||
// would not be updated with the events enabled by the signal handlers.
|
||||
StartBatchedEventUpdates();
|
||||
#endif
|
||||
// Make sure we deliver connect/accept first. Otherwise, consumers may see
|
||||
// something like a READ followed by a CONNECT, which would be odd.
|
||||
if ((ff & DE_CONNECT) != 0) {
|
||||
@ -797,10 +819,65 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
|
||||
SetEnabledEvents(0);
|
||||
SignalCloseEvent(this, err);
|
||||
}
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
FinishBatchedEventUpdates();
|
||||
#endif
|
||||
}
|
||||
|
||||
#endif // WEBRTC_POSIX
|
||||
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
|
||||
static int GetEpollEvents(uint32_t ff) {
|
||||
int events = 0;
|
||||
if (ff & (DE_READ | DE_ACCEPT)) {
|
||||
events |= EPOLLIN;
|
||||
}
|
||||
if (ff & (DE_WRITE | DE_CONNECT)) {
|
||||
events |= EPOLLOUT;
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
void SocketDispatcher::StartBatchedEventUpdates() {
|
||||
RTC_DCHECK_EQ(saved_enabled_events_, -1);
|
||||
saved_enabled_events_ = enabled_events();
|
||||
}
|
||||
|
||||
void SocketDispatcher::FinishBatchedEventUpdates() {
|
||||
RTC_DCHECK_NE(saved_enabled_events_, -1);
|
||||
uint8_t old_events = static_cast<uint8_t>(saved_enabled_events_);
|
||||
saved_enabled_events_ = -1;
|
||||
MaybeUpdateDispatcher(old_events);
|
||||
}
|
||||
|
||||
void SocketDispatcher::MaybeUpdateDispatcher(uint8_t old_events) {
|
||||
if (GetEpollEvents(enabled_events()) != GetEpollEvents(old_events) &&
|
||||
saved_enabled_events_ == -1) {
|
||||
ss_->Update(this);
|
||||
}
|
||||
}
|
||||
|
||||
void SocketDispatcher::SetEnabledEvents(uint8_t events) {
|
||||
uint8_t old_events = enabled_events();
|
||||
PhysicalSocket::SetEnabledEvents(events);
|
||||
MaybeUpdateDispatcher(old_events);
|
||||
}
|
||||
|
||||
void SocketDispatcher::EnableEvents(uint8_t events) {
|
||||
uint8_t old_events = enabled_events();
|
||||
PhysicalSocket::EnableEvents(events);
|
||||
MaybeUpdateDispatcher(old_events);
|
||||
}
|
||||
|
||||
void SocketDispatcher::DisableEvents(uint8_t events) {
|
||||
uint8_t old_events = enabled_events();
|
||||
PhysicalSocket::DisableEvents(events);
|
||||
MaybeUpdateDispatcher(old_events);
|
||||
}
|
||||
|
||||
#endif // WEBRTC_USE_EPOLL
|
||||
|
||||
int SocketDispatcher::Close() {
|
||||
if (s_ == INVALID_SOCKET)
|
||||
return 0;
|
||||
@ -1129,6 +1206,17 @@ class Signaler : public EventDispatcher {
|
||||
|
||||
PhysicalSocketServer::PhysicalSocketServer()
|
||||
: fWait_(false) {
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
// Since Linux 2.6.8, the size argument is ignored, but must be greater than
|
||||
// zero. Before that the size served as hint to the kernel for the amount of
|
||||
// space to initially allocate in internal data structures.
|
||||
epoll_fd_ = epoll_create(FD_SETSIZE);
|
||||
if (epoll_fd_ == -1) {
|
||||
// Not an error, will fall back to "select" below.
|
||||
LOG_E(LS_WARNING, EN, errno) << "epoll_create";
|
||||
epoll_fd_ = INVALID_SOCKET;
|
||||
}
|
||||
#endif
|
||||
signal_wakeup_ = new Signaler(this, &fWait_);
|
||||
#if defined(WEBRTC_WIN)
|
||||
socket_ev_ = WSACreateEvent();
|
||||
@ -1143,6 +1231,11 @@ PhysicalSocketServer::~PhysicalSocketServer() {
|
||||
signal_dispatcher_.reset();
|
||||
#endif
|
||||
delete signal_wakeup_;
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
if (epoll_fd_ != INVALID_SOCKET) {
|
||||
close(epoll_fd_);
|
||||
}
|
||||
#endif
|
||||
RTC_DCHECK(dispatchers_.empty());
|
||||
}
|
||||
|
||||
@ -1190,40 +1283,148 @@ AsyncSocket* PhysicalSocketServer::WrapSocket(SOCKET s) {
|
||||
|
||||
void PhysicalSocketServer::Add(Dispatcher *pdispatcher) {
|
||||
CritScope cs(&crit_);
|
||||
// Prevent duplicates. This can cause dead dispatchers to stick around.
|
||||
DispatcherList::iterator pos = std::find(dispatchers_.begin(),
|
||||
dispatchers_.end(),
|
||||
pdispatcher);
|
||||
if (pos != dispatchers_.end())
|
||||
return;
|
||||
dispatchers_.push_back(pdispatcher);
|
||||
if (processing_dispatchers_) {
|
||||
// A dispatcher is being added while a "Wait" call is processing the
|
||||
// list of socket events.
|
||||
// Defer adding to "dispatchers_" set until processing is done to avoid
|
||||
// invalidating the iterator in "Wait".
|
||||
pending_remove_dispatchers_.erase(pdispatcher);
|
||||
pending_add_dispatchers_.insert(pdispatcher);
|
||||
} else {
|
||||
dispatchers_.insert(pdispatcher);
|
||||
}
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
if (epoll_fd_ != INVALID_SOCKET) {
|
||||
AddEpoll(pdispatcher);
|
||||
}
|
||||
#endif // WEBRTC_USE_EPOLL
|
||||
}
|
||||
|
||||
void PhysicalSocketServer::Remove(Dispatcher *pdispatcher) {
|
||||
CritScope cs(&crit_);
|
||||
DispatcherList::iterator pos = std::find(dispatchers_.begin(),
|
||||
dispatchers_.end(),
|
||||
pdispatcher);
|
||||
// We silently ignore duplicate calls to Add, so we should silently ignore
|
||||
// the (expected) symmetric calls to Remove. Note that this may still hide
|
||||
// a real issue, so we at least log a warning about it.
|
||||
if (pos == dispatchers_.end()) {
|
||||
if (processing_dispatchers_) {
|
||||
// A dispatcher is being removed while a "Wait" call is processing the
|
||||
// list of socket events.
|
||||
// Defer removal from "dispatchers_" set until processing is done to avoid
|
||||
// invalidating the iterator in "Wait".
|
||||
if (!pending_add_dispatchers_.erase(pdispatcher) &&
|
||||
dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
||||
LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
|
||||
<< "dispatcher, potentially from a duplicate call to "
|
||||
<< "Add.";
|
||||
return;
|
||||
}
|
||||
|
||||
pending_remove_dispatchers_.insert(pdispatcher);
|
||||
} else if (!dispatchers_.erase(pdispatcher)) {
|
||||
LOG(LS_WARNING) << "PhysicalSocketServer asked to remove a unknown "
|
||||
<< "dispatcher, potentially from a duplicate call to Add.";
|
||||
return;
|
||||
}
|
||||
size_t index = pos - dispatchers_.begin();
|
||||
dispatchers_.erase(pos);
|
||||
for (IteratorList::iterator it = iterators_.begin(); it != iterators_.end();
|
||||
++it) {
|
||||
if (index < **it) {
|
||||
--**it;
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
if (epoll_fd_ != INVALID_SOCKET) {
|
||||
RemoveEpoll(pdispatcher);
|
||||
}
|
||||
#endif // WEBRTC_USE_EPOLL
|
||||
}
|
||||
|
||||
void PhysicalSocketServer::Update(Dispatcher* pdispatcher) {
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
if (epoll_fd_ == INVALID_SOCKET) {
|
||||
return;
|
||||
}
|
||||
|
||||
CritScope cs(&crit_);
|
||||
if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
UpdateEpoll(pdispatcher);
|
||||
#endif
|
||||
}
|
||||
|
||||
void PhysicalSocketServer::AddRemovePendingDispatchers() {
|
||||
if (!pending_add_dispatchers_.empty()) {
|
||||
for (Dispatcher* pdispatcher : pending_add_dispatchers_) {
|
||||
dispatchers_.insert(pdispatcher);
|
||||
}
|
||||
pending_add_dispatchers_.clear();
|
||||
}
|
||||
|
||||
if (!pending_remove_dispatchers_.empty()) {
|
||||
for (Dispatcher* pdispatcher : pending_remove_dispatchers_) {
|
||||
dispatchers_.erase(pdispatcher);
|
||||
}
|
||||
pending_remove_dispatchers_.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#if defined(WEBRTC_POSIX)
|
||||
|
||||
bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
// We don't keep a dedicated "epoll" descriptor containing only the non-IO
|
||||
// (i.e. signaling) dispatcher, so "poll" will be used instead of the default
|
||||
// "select" to support sockets larger than FD_SETSIZE.
|
||||
if (!process_io) {
|
||||
return WaitPoll(cmsWait, signal_wakeup_);
|
||||
} else if (epoll_fd_ != INVALID_SOCKET) {
|
||||
return WaitEpoll(cmsWait);
|
||||
}
|
||||
#endif
|
||||
return WaitSelect(cmsWait, process_io);
|
||||
}
|
||||
|
||||
static void ProcessEvents(Dispatcher* dispatcher,
|
||||
bool readable,
|
||||
bool writable,
|
||||
bool check_error) {
|
||||
int errcode = 0;
|
||||
// TODO(pthatcher): Should we set errcode if getsockopt fails?
|
||||
if (check_error) {
|
||||
socklen_t len = sizeof(errcode);
|
||||
::getsockopt(dispatcher->GetDescriptor(), SOL_SOCKET, SO_ERROR, &errcode,
|
||||
&len);
|
||||
}
|
||||
|
||||
uint32_t ff = 0;
|
||||
|
||||
// Check readable descriptors. If we're waiting on an accept, signal
|
||||
// that. Otherwise we're waiting for data, check to see if we're
|
||||
// readable or really closed.
|
||||
// TODO(pthatcher): Only peek at TCP descriptors.
|
||||
if (readable) {
|
||||
if (dispatcher->GetRequestedEvents() & DE_ACCEPT) {
|
||||
ff |= DE_ACCEPT;
|
||||
} else if (errcode || dispatcher->IsDescriptorClosed()) {
|
||||
ff |= DE_CLOSE;
|
||||
} else {
|
||||
ff |= DE_READ;
|
||||
}
|
||||
}
|
||||
|
||||
// Check writable descriptors. If we're waiting on a connect, detect
|
||||
// success versus failure by the reaped error code.
|
||||
if (writable) {
|
||||
if (dispatcher->GetRequestedEvents() & DE_CONNECT) {
|
||||
if (!errcode) {
|
||||
ff |= DE_CONNECT;
|
||||
} else {
|
||||
ff |= DE_CLOSE;
|
||||
}
|
||||
} else {
|
||||
ff |= DE_WRITE;
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the descriptor about the event.
|
||||
if (ff != 0) {
|
||||
dispatcher->OnPreEvent(ff);
|
||||
dispatcher->OnEvent(ff, errcode);
|
||||
}
|
||||
}
|
||||
|
||||
bool PhysicalSocketServer::WaitSelect(int cmsWait, bool process_io) {
|
||||
// Calculate timing information
|
||||
|
||||
struct timeval* ptvWait = nullptr;
|
||||
@ -1266,13 +1467,17 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
int fdmax = -1;
|
||||
{
|
||||
CritScope cr(&crit_);
|
||||
for (size_t i = 0; i < dispatchers_.size(); ++i) {
|
||||
// TODO(jbauch): Support re-entrant waiting.
|
||||
RTC_DCHECK(!processing_dispatchers_);
|
||||
for (Dispatcher* pdispatcher : dispatchers_) {
|
||||
// Query dispatchers for read and write wait state
|
||||
Dispatcher *pdispatcher = dispatchers_[i];
|
||||
RTC_DCHECK(pdispatcher);
|
||||
if (!process_io && (pdispatcher != signal_wakeup_))
|
||||
continue;
|
||||
int fd = pdispatcher->GetDescriptor();
|
||||
// "select"ing a file descriptor that is equal to or larger than
|
||||
// FD_SETSIZE will result in undefined behavior.
|
||||
RTC_DCHECK_LT(fd, FD_SETSIZE);
|
||||
if (fd > fdmax)
|
||||
fdmax = fd;
|
||||
|
||||
@ -1306,55 +1511,28 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
} else {
|
||||
// We have signaled descriptors
|
||||
CritScope cr(&crit_);
|
||||
for (size_t i = 0; i < dispatchers_.size(); ++i) {
|
||||
Dispatcher *pdispatcher = dispatchers_[i];
|
||||
processing_dispatchers_ = true;
|
||||
for (Dispatcher* pdispatcher : dispatchers_) {
|
||||
int fd = pdispatcher->GetDescriptor();
|
||||
uint32_t ff = 0;
|
||||
int errcode = 0;
|
||||
|
||||
// Reap any error code, which can be signaled through reads or writes.
|
||||
// TODO(pthatcher): Should we set errcode if getsockopt fails?
|
||||
if (FD_ISSET(fd, &fdsRead) || FD_ISSET(fd, &fdsWrite)) {
|
||||
socklen_t len = sizeof(errcode);
|
||||
::getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &len);
|
||||
}
|
||||
|
||||
// Check readable descriptors. If we're waiting on an accept, signal
|
||||
// that. Otherwise we're waiting for data, check to see if we're
|
||||
// readable or really closed.
|
||||
// TODO(pthatcher): Only peek at TCP descriptors.
|
||||
if (FD_ISSET(fd, &fdsRead)) {
|
||||
bool readable = FD_ISSET(fd, &fdsRead);
|
||||
if (readable) {
|
||||
FD_CLR(fd, &fdsRead);
|
||||
if (pdispatcher->GetRequestedEvents() & DE_ACCEPT) {
|
||||
ff |= DE_ACCEPT;
|
||||
} else if (errcode || pdispatcher->IsDescriptorClosed()) {
|
||||
ff |= DE_CLOSE;
|
||||
} else {
|
||||
ff |= DE_READ;
|
||||
}
|
||||
}
|
||||
|
||||
// Check writable descriptors. If we're waiting on a connect, detect
|
||||
// success versus failure by the reaped error code.
|
||||
if (FD_ISSET(fd, &fdsWrite)) {
|
||||
bool writable = FD_ISSET(fd, &fdsWrite);
|
||||
if (writable) {
|
||||
FD_CLR(fd, &fdsWrite);
|
||||
if (pdispatcher->GetRequestedEvents() & DE_CONNECT) {
|
||||
if (!errcode) {
|
||||
ff |= DE_CONNECT;
|
||||
} else {
|
||||
ff |= DE_CLOSE;
|
||||
}
|
||||
} else {
|
||||
ff |= DE_WRITE;
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the descriptor about the event.
|
||||
if (ff != 0) {
|
||||
pdispatcher->OnPreEvent(ff);
|
||||
pdispatcher->OnEvent(ff, errcode);
|
||||
}
|
||||
// The error code can be signaled through reads or writes.
|
||||
ProcessEvents(pdispatcher, readable, writable, readable || writable);
|
||||
}
|
||||
|
||||
processing_dispatchers_ = false;
|
||||
// Process deferred dispatchers that have been added/removed while the
|
||||
// events were handled above.
|
||||
AddRemovePendingDispatchers();
|
||||
}
|
||||
|
||||
// Recalc the time remaining to wait. Doing it here means it doesn't get
|
||||
@ -1381,6 +1559,214 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
return true;
|
||||
}
|
||||
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
|
||||
// Initial number of events to process with one call to "epoll_wait".
|
||||
static const size_t kInitialEpollEvents = 128;
|
||||
|
||||
// Maximum number of events to process with one call to "epoll_wait".
|
||||
static const size_t kMaxEpollEvents = 8192;
|
||||
|
||||
void PhysicalSocketServer::AddEpoll(Dispatcher* pdispatcher) {
|
||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||
int fd = pdispatcher->GetDescriptor();
|
||||
RTC_DCHECK(fd != INVALID_SOCKET);
|
||||
if (fd == INVALID_SOCKET) {
|
||||
return;
|
||||
}
|
||||
|
||||
struct epoll_event event = {0};
|
||||
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
||||
event.data.ptr = pdispatcher;
|
||||
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &event);
|
||||
RTC_DCHECK_EQ(err, 0);
|
||||
if (err == -1) {
|
||||
LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_ADD";
|
||||
}
|
||||
}
|
||||
|
||||
void PhysicalSocketServer::RemoveEpoll(Dispatcher* pdispatcher) {
|
||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||
int fd = pdispatcher->GetDescriptor();
|
||||
RTC_DCHECK(fd != INVALID_SOCKET);
|
||||
if (fd == INVALID_SOCKET) {
|
||||
return;
|
||||
}
|
||||
|
||||
struct epoll_event event = {0};
|
||||
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &event);
|
||||
RTC_DCHECK(err == 0 || errno == ENOENT);
|
||||
if (err == -1) {
|
||||
if (errno == ENOENT) {
|
||||
// Socket has already been closed.
|
||||
LOG_E(LS_VERBOSE, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
|
||||
} else {
|
||||
LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_DEL";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PhysicalSocketServer::UpdateEpoll(Dispatcher* pdispatcher) {
|
||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||
int fd = pdispatcher->GetDescriptor();
|
||||
RTC_DCHECK(fd != INVALID_SOCKET);
|
||||
if (fd == INVALID_SOCKET) {
|
||||
return;
|
||||
}
|
||||
|
||||
struct epoll_event event = {0};
|
||||
event.events = GetEpollEvents(pdispatcher->GetRequestedEvents());
|
||||
event.data.ptr = pdispatcher;
|
||||
int err = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &event);
|
||||
RTC_DCHECK_EQ(err, 0);
|
||||
if (err == -1) {
|
||||
LOG_E(LS_ERROR, EN, errno) << "epoll_ctl EPOLL_CTL_MOD";
|
||||
}
|
||||
}
|
||||
|
||||
bool PhysicalSocketServer::WaitEpoll(int cmsWait) {
|
||||
RTC_DCHECK(epoll_fd_ != INVALID_SOCKET);
|
||||
int64_t tvWait = -1;
|
||||
int64_t tvStop = -1;
|
||||
if (cmsWait != kForever) {
|
||||
tvWait = cmsWait;
|
||||
tvStop = TimeAfter(cmsWait);
|
||||
}
|
||||
|
||||
if (epoll_events_.empty()) {
|
||||
// The initial space to receive events is created only if epoll is used.
|
||||
epoll_events_.resize(kInitialEpollEvents);
|
||||
}
|
||||
|
||||
fWait_ = true;
|
||||
|
||||
while (fWait_) {
|
||||
// Wait then call handlers as appropriate
|
||||
// < 0 means error
|
||||
// 0 means timeout
|
||||
// > 0 means count of descriptors ready
|
||||
int n = epoll_wait(epoll_fd_, &epoll_events_[0],
|
||||
static_cast<int>(epoll_events_.size()),
|
||||
static_cast<int>(tvWait));
|
||||
if (n < 0) {
|
||||
if (errno != EINTR) {
|
||||
LOG_E(LS_ERROR, EN, errno) << "epoll";
|
||||
return false;
|
||||
}
|
||||
// Else ignore the error and keep going. If this EINTR was for one of the
|
||||
// signals managed by this PhysicalSocketServer, the
|
||||
// PosixSignalDeliveryDispatcher will be in the signaled state in the next
|
||||
// iteration.
|
||||
} else if (n == 0) {
|
||||
// If timeout, return success
|
||||
return true;
|
||||
} else {
|
||||
// We have signaled descriptors
|
||||
CritScope cr(&crit_);
|
||||
for (int i = 0; i < n; ++i) {
|
||||
const epoll_event& event = epoll_events_[i];
|
||||
Dispatcher* pdispatcher = static_cast<Dispatcher*>(event.data.ptr);
|
||||
if (dispatchers_.find(pdispatcher) == dispatchers_.end()) {
|
||||
// The dispatcher for this socket no longer exists.
|
||||
continue;
|
||||
}
|
||||
|
||||
bool readable = (event.events & (EPOLLIN | EPOLLPRI));
|
||||
bool writable = (event.events & EPOLLOUT);
|
||||
bool check_error = (event.events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP));
|
||||
|
||||
ProcessEvents(pdispatcher, readable, writable, check_error);
|
||||
}
|
||||
}
|
||||
|
||||
if (static_cast<size_t>(n) == epoll_events_.size() &&
|
||||
epoll_events_.size() < kMaxEpollEvents) {
|
||||
// We used the complete space to receive events, increase size for future
|
||||
// iterations.
|
||||
epoll_events_.resize(std::max(epoll_events_.size() * 2, kMaxEpollEvents));
|
||||
}
|
||||
|
||||
if (cmsWait != kForever) {
|
||||
tvWait = TimeDiff(tvStop, TimeMillis());
|
||||
if (tvWait < 0) {
|
||||
// Return success on timeout.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PhysicalSocketServer::WaitPoll(int cmsWait, Dispatcher* dispatcher) {
|
||||
RTC_DCHECK(dispatcher);
|
||||
int64_t tvWait = -1;
|
||||
int64_t tvStop = -1;
|
||||
if (cmsWait != kForever) {
|
||||
tvWait = cmsWait;
|
||||
tvStop = TimeAfter(cmsWait);
|
||||
}
|
||||
|
||||
fWait_ = true;
|
||||
|
||||
struct pollfd fds = {0};
|
||||
int fd = dispatcher->GetDescriptor();
|
||||
fds.fd = fd;
|
||||
|
||||
while (fWait_) {
|
||||
uint32_t ff = dispatcher->GetRequestedEvents();
|
||||
fds.events = 0;
|
||||
if (ff & (DE_READ | DE_ACCEPT)) {
|
||||
fds.events |= POLLIN;
|
||||
}
|
||||
if (ff & (DE_WRITE | DE_CONNECT)) {
|
||||
fds.events |= POLLOUT;
|
||||
}
|
||||
fds.revents = 0;
|
||||
|
||||
// Wait then call handlers as appropriate
|
||||
// < 0 means error
|
||||
// 0 means timeout
|
||||
// > 0 means count of descriptors ready
|
||||
int n = poll(&fds, 1, static_cast<int>(tvWait));
|
||||
if (n < 0) {
|
||||
if (errno != EINTR) {
|
||||
LOG_E(LS_ERROR, EN, errno) << "poll";
|
||||
return false;
|
||||
}
|
||||
// Else ignore the error and keep going. If this EINTR was for one of the
|
||||
// signals managed by this PhysicalSocketServer, the
|
||||
// PosixSignalDeliveryDispatcher will be in the signaled state in the next
|
||||
// iteration.
|
||||
} else if (n == 0) {
|
||||
// If timeout, return success
|
||||
return true;
|
||||
} else {
|
||||
// We have signaled descriptors (should only be the passed dispatcher).
|
||||
RTC_DCHECK_EQ(n, 1);
|
||||
RTC_DCHECK_EQ(fds.fd, fd);
|
||||
|
||||
bool readable = (fds.revents & (POLLIN | POLLPRI));
|
||||
bool writable = (fds.revents & POLLOUT);
|
||||
bool check_error = (fds.revents & (POLLRDHUP | POLLERR | POLLHUP));
|
||||
|
||||
ProcessEvents(dispatcher, readable, writable, check_error);
|
||||
}
|
||||
|
||||
if (cmsWait != kForever) {
|
||||
tvWait = TimeDiff(tvStop, TimeMillis());
|
||||
if (tvWait < 0) {
|
||||
// Return success on timeout.
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#endif // WEBRTC_USE_EPOLL
|
||||
|
||||
static void GlobalSignalHandler(int signum) {
|
||||
PosixSignalHandler::Instance()->OnPosixSignalReceived(signum);
|
||||
}
|
||||
@ -1454,12 +1840,13 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
|
||||
{
|
||||
CritScope cr(&crit_);
|
||||
size_t i = 0;
|
||||
iterators_.push_back(&i);
|
||||
// Don't track dispatchers_.size(), because we want to pick up any new
|
||||
// dispatchers that were added while processing the loop.
|
||||
while (i < dispatchers_.size()) {
|
||||
Dispatcher* disp = dispatchers_[i++];
|
||||
// TODO(jbauch): Support re-entrant waiting.
|
||||
RTC_DCHECK(!processing_dispatchers_);
|
||||
|
||||
// Calling "CheckSignalClose" might remove a closed dispatcher from the
|
||||
// set. This must be deferred to prevent invalidating the iterator.
|
||||
processing_dispatchers_ = true;
|
||||
for (Dispatcher* disp : dispatchers_) {
|
||||
if (!process_io && (disp != signal_wakeup_))
|
||||
continue;
|
||||
SOCKET s = disp->GetSocket();
|
||||
@ -1474,8 +1861,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
event_owners.push_back(disp);
|
||||
}
|
||||
}
|
||||
RTC_DCHECK(iterators_.back() == &i);
|
||||
iterators_.pop_back();
|
||||
|
||||
processing_dispatchers_ = false;
|
||||
// Process deferred dispatchers that have been added/removed while the
|
||||
// events were handled above.
|
||||
AddRemovePendingDispatchers();
|
||||
}
|
||||
|
||||
// Which is shorter, the delay wait or the asked wait?
|
||||
@ -1509,14 +1899,15 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
int index = dw - WSA_WAIT_EVENT_0;
|
||||
if (index > 0) {
|
||||
--index; // The first event is the socket event
|
||||
event_owners[index]->OnPreEvent(0);
|
||||
event_owners[index]->OnEvent(0, 0);
|
||||
Dispatcher* disp = event_owners[index];
|
||||
// The dispatcher could have been removed while waiting for events.
|
||||
if (dispatchers_.find(disp) != dispatchers_.end()) {
|
||||
disp->OnPreEvent(0);
|
||||
disp->OnEvent(0, 0);
|
||||
}
|
||||
} else if (process_io) {
|
||||
size_t i = 0, end = dispatchers_.size();
|
||||
iterators_.push_back(&i);
|
||||
iterators_.push_back(&end); // Don't iterate over new dispatchers.
|
||||
while (i < end) {
|
||||
Dispatcher* disp = dispatchers_[i++];
|
||||
processing_dispatchers_ = true;
|
||||
for (Dispatcher* disp : dispatchers_) {
|
||||
SOCKET s = disp->GetSocket();
|
||||
if (s == INVALID_SOCKET)
|
||||
continue;
|
||||
@ -1577,10 +1968,11 @@ bool PhysicalSocketServer::Wait(int cmsWait, bool process_io) {
|
||||
}
|
||||
}
|
||||
}
|
||||
RTC_DCHECK(iterators_.back() == &end);
|
||||
iterators_.pop_back();
|
||||
RTC_DCHECK(iterators_.back() == &i);
|
||||
iterators_.pop_back();
|
||||
|
||||
processing_dispatchers_ = false;
|
||||
// Process deferred dispatchers that have been added/removed while the
|
||||
// events were handled above.
|
||||
AddRemovePendingDispatchers();
|
||||
}
|
||||
|
||||
// Reset the network event until new activity occurs
|
||||
|
||||
@ -11,7 +11,13 @@
|
||||
#ifndef WEBRTC_BASE_PHYSICALSOCKETSERVER_H__
|
||||
#define WEBRTC_BASE_PHYSICALSOCKETSERVER_H__
|
||||
|
||||
#if defined(WEBRTC_POSIX) && defined(WEBRTC_LINUX)
|
||||
#include <sys/epoll.h>
|
||||
#define WEBRTC_USE_EPOLL 1
|
||||
#endif
|
||||
|
||||
#include <memory>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
|
||||
#include "webrtc/base/nethelpers.h"
|
||||
@ -76,6 +82,7 @@ class PhysicalSocketServer : public SocketServer {
|
||||
|
||||
void Add(Dispatcher* dispatcher);
|
||||
void Remove(Dispatcher* dispatcher);
|
||||
void Update(Dispatcher* dispatcher);
|
||||
|
||||
#if defined(WEBRTC_POSIX)
|
||||
// Sets the function to be executed in response to the specified POSIX signal.
|
||||
@ -95,16 +102,30 @@ class PhysicalSocketServer : public SocketServer {
|
||||
#endif
|
||||
|
||||
private:
|
||||
typedef std::vector<Dispatcher*> DispatcherList;
|
||||
typedef std::vector<size_t*> IteratorList;
|
||||
typedef std::set<Dispatcher*> DispatcherSet;
|
||||
|
||||
void AddRemovePendingDispatchers();
|
||||
|
||||
#if defined(WEBRTC_POSIX)
|
||||
bool WaitSelect(int cms, bool process_io);
|
||||
static bool InstallSignal(int signum, void (*handler)(int));
|
||||
|
||||
std::unique_ptr<PosixSignalDispatcher> signal_dispatcher_;
|
||||
#endif
|
||||
DispatcherList dispatchers_;
|
||||
IteratorList iterators_;
|
||||
#endif // WEBRTC_POSIX
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
void AddEpoll(Dispatcher* dispatcher);
|
||||
void RemoveEpoll(Dispatcher* dispatcher);
|
||||
void UpdateEpoll(Dispatcher* dispatcher);
|
||||
bool WaitEpoll(int cms);
|
||||
bool WaitPoll(int cms, Dispatcher* dispatcher);
|
||||
|
||||
int epoll_fd_ = INVALID_SOCKET;
|
||||
std::vector<struct epoll_event> epoll_events_;
|
||||
#endif // WEBRTC_USE_EPOLL
|
||||
DispatcherSet dispatchers_;
|
||||
DispatcherSet pending_add_dispatchers_;
|
||||
DispatcherSet pending_remove_dispatchers_;
|
||||
bool processing_dispatchers_ = false;
|
||||
Signaler* signal_wakeup_;
|
||||
CriticalSection crit_;
|
||||
bool fWait_;
|
||||
@ -172,9 +193,9 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
|
||||
void MaybeRemapSendError();
|
||||
|
||||
uint8_t enabled_events() const { return enabled_events_; }
|
||||
void SetEnabledEvents(uint8_t events);
|
||||
void EnableEvents(uint8_t events);
|
||||
void DisableEvents(uint8_t events);
|
||||
virtual void SetEnabledEvents(uint8_t events);
|
||||
virtual void EnableEvents(uint8_t events);
|
||||
virtual void DisableEvents(uint8_t events);
|
||||
|
||||
static int TranslateOption(Option opt, int* slevel, int* sopt);
|
||||
|
||||
@ -220,13 +241,28 @@ class SocketDispatcher : public Dispatcher, public PhysicalSocket {
|
||||
|
||||
int Close() override;
|
||||
|
||||
#if defined(WEBRTC_WIN)
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
protected:
|
||||
void StartBatchedEventUpdates();
|
||||
void FinishBatchedEventUpdates();
|
||||
|
||||
void SetEnabledEvents(uint8_t events) override;
|
||||
void EnableEvents(uint8_t events) override;
|
||||
void DisableEvents(uint8_t events) override;
|
||||
#endif
|
||||
|
||||
private:
|
||||
#if defined(WEBRTC_WIN)
|
||||
static int next_id_;
|
||||
int id_;
|
||||
bool signal_close_;
|
||||
int signal_err_;
|
||||
#endif // WEBRTC_WIN
|
||||
#if defined(WEBRTC_USE_EPOLL)
|
||||
void MaybeUpdateDispatcher(uint8_t old_events);
|
||||
|
||||
int saved_enabled_events_ = -1;
|
||||
#endif
|
||||
};
|
||||
|
||||
} // namespace rtc
|
||||
|
||||
Reference in New Issue
Block a user