Add methods to change enabled events in PhysicalSocket.

This is in preparation for "epoll" integration where additional code needs to
run when the enabled events change.

BUG=webrtc:7585

Review-Url: https://codereview.webrtc.org/2893723002
Cr-Commit-Position: refs/heads/master@{#18189}
This commit is contained in:
jbauch
2017-05-17 16:32:26 -07:00
committed by Commit bot
parent 855aeead57
commit 577f5dc60b
2 changed files with 46 additions and 25 deletions

View File

@ -121,7 +121,7 @@ static const int ICMP_PING_TIMEOUT_MILLIS = 10000u;
#endif
PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
: ss_(ss), s_(s), enabled_events_(0), error_(0),
: ss_(ss), s_(s), error_(0),
state_((s == INVALID_SOCKET) ? CS_CLOSED : CS_CONNECTED),
resolver_(nullptr) {
#if defined(WEBRTC_WIN)
@ -133,7 +133,7 @@ PhysicalSocket::PhysicalSocket(PhysicalSocketServer* ss, SOCKET s)
EnsureWinsockInit();
#endif
if (s_ != INVALID_SOCKET) {
enabled_events_ = DE_READ | DE_WRITE;
SetEnabledEvents(DE_READ | DE_WRITE);
int type = SOCK_STREAM;
socklen_t len = sizeof(type);
@ -153,8 +153,9 @@ bool PhysicalSocket::Create(int family, int type) {
s_ = ::socket(family, type, 0);
udp_ = (SOCK_DGRAM == type);
UpdateLastError();
if (udp_)
enabled_events_ = DE_READ | DE_WRITE;
if (udp_) {
SetEnabledEvents(DE_READ | DE_WRITE);
}
return s_ != INVALID_SOCKET;
}
@ -266,16 +267,17 @@ int PhysicalSocket::DoConnect(const SocketAddress& connect_addr) {
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
int err = ::connect(s_, addr, static_cast<int>(len));
UpdateLastError();
uint8_t events = DE_READ | DE_WRITE;
if (err == 0) {
state_ = CS_CONNECTED;
} else if (IsBlockingError(GetError())) {
state_ = CS_CONNECTING;
enabled_events_ |= DE_CONNECT;
events |= DE_CONNECT;
} else {
return SOCKET_ERROR;
}
enabled_events_ |= DE_READ | DE_WRITE;
EnableEvents(events);
return 0;
}
@ -341,7 +343,7 @@ int PhysicalSocket::Send(const void* pv, size_t cb) {
RTC_DCHECK(sent <= static_cast<int>(cb));
if ((sent > 0 && sent < static_cast<int>(cb)) ||
(sent < 0 && IsBlockingError(GetError()))) {
enabled_events_ |= DE_WRITE;
EnableEvents(DE_WRITE);
}
return sent;
}
@ -366,7 +368,7 @@ int PhysicalSocket::SendTo(const void* buffer,
RTC_DCHECK(sent <= static_cast<int>(length));
if ((sent > 0 && sent < static_cast<int>(length)) ||
(sent < 0 && IsBlockingError(GetError()))) {
enabled_events_ |= DE_WRITE;
EnableEvents(DE_WRITE);
}
return sent;
}
@ -381,7 +383,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
LOG(LS_WARNING) << "EOF from socket; deferring close event";
// Must turn this back on so that the select() loop will notice the close
// event.
enabled_events_ |= DE_READ;
EnableEvents(DE_READ);
SetError(EWOULDBLOCK);
return SOCKET_ERROR;
}
@ -392,7 +394,7 @@ int PhysicalSocket::Recv(void* buffer, size_t length, int64_t* timestamp) {
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
if (udp_ || success) {
enabled_events_ |= DE_READ;
EnableEvents(DE_READ);
}
if (!success) {
LOG_F(LS_VERBOSE) << "Error = " << error;
@ -418,7 +420,7 @@ int PhysicalSocket::RecvFrom(void* buffer,
int error = GetError();
bool success = (received >= 0) || IsBlockingError(error);
if (udp_ || success) {
enabled_events_ |= DE_READ;
EnableEvents(DE_READ);
}
if (!success) {
LOG_F(LS_VERBOSE) << "Error = " << error;
@ -431,7 +433,7 @@ int PhysicalSocket::Listen(int backlog) {
UpdateLastError();
if (err == 0) {
state_ = CS_CONNECTING;
enabled_events_ |= DE_ACCEPT;
EnableEvents(DE_ACCEPT);
#if !defined(NDEBUG)
dbg_addr_ = "Listening @ ";
dbg_addr_.append(GetLocalAddress().ToString());
@ -443,7 +445,7 @@ int PhysicalSocket::Listen(int backlog) {
AsyncSocket* PhysicalSocket::Accept(SocketAddress* out_addr) {
// Always re-subscribe DE_ACCEPT to make sure new incoming connections will
// trigger an event even if DoAccept returns an error here.
enabled_events_ |= DE_ACCEPT;
EnableEvents(DE_ACCEPT);
sockaddr_storage addr_storage;
socklen_t addr_len = sizeof(addr_storage);
sockaddr* addr = reinterpret_cast<sockaddr*>(&addr_storage);
@ -463,7 +465,7 @@ int PhysicalSocket::Close() {
UpdateLastError();
s_ = INVALID_SOCKET;
state_ = CS_CLOSED;
enabled_events_ = 0;
SetEnabledEvents(0);
if (resolver_) {
resolver_->Destroy(false);
resolver_ = nullptr;
@ -525,6 +527,18 @@ void PhysicalSocket::MaybeRemapSendError() {
#endif
}
void PhysicalSocket::SetEnabledEvents(uint8_t events) {
enabled_events_ = events;
}
void PhysicalSocket::EnableEvents(uint8_t events) {
enabled_events_ |= events;
}
void PhysicalSocket::DisableEvents(uint8_t events) {
enabled_events_ &= ~events;
}
int PhysicalSocket::TranslateOption(Option opt, int* slevel, int* sopt) {
switch (opt) {
case OPT_DONTFRAGMENT:
@ -699,7 +713,7 @@ bool SocketDispatcher::IsDescriptorClosed() {
#endif // WEBRTC_POSIX
uint32_t SocketDispatcher::GetRequestedEvents() {
return enabled_events_;
return enabled_events();
}
void SocketDispatcher::OnPreEvent(uint32_t ff) {
@ -723,7 +737,7 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
if (((ff & DE_CONNECT) != 0) && (id_ == cache_id)) {
if (ff != DE_CONNECT)
LOG(LS_VERBOSE) << "Signalled with DE_CONNECT: " << ff;
enabled_events_ &= ~DE_CONNECT;
DisableEvents(DE_CONNECT);
#if !defined(NDEBUG)
dbg_addr_ = "Connected @ ";
dbg_addr_.append(GetRemoteAddress().ToString());
@ -731,15 +745,15 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
SignalConnectEvent(this);
}
if (((ff & DE_ACCEPT) != 0) && (id_ == cache_id)) {
enabled_events_ &= ~DE_ACCEPT;
DisableEvents(DE_ACCEPT);
SignalReadEvent(this);
}
if ((ff & DE_READ) != 0) {
enabled_events_ &= ~DE_READ;
DisableEvents(DE_READ);
SignalReadEvent(this);
}
if (((ff & DE_WRITE) != 0) && (id_ == cache_id)) {
enabled_events_ &= ~DE_WRITE;
DisableEvents(DE_WRITE);
SignalWriteEvent(this);
}
if (((ff & DE_CLOSE) != 0) && (id_ == cache_id)) {
@ -754,24 +768,24 @@ void SocketDispatcher::OnEvent(uint32_t ff, int err) {
// Make sure we deliver connect/accept first. Otherwise, consumers may see
// something like a READ followed by a CONNECT, which would be odd.
if ((ff & DE_CONNECT) != 0) {
enabled_events_ &= ~DE_CONNECT;
DisableEvents(DE_CONNECT);
SignalConnectEvent(this);
}
if ((ff & DE_ACCEPT) != 0) {
enabled_events_ &= ~DE_ACCEPT;
DisableEvents(DE_ACCEPT);
SignalReadEvent(this);
}
if ((ff & DE_READ) != 0) {
enabled_events_ &= ~DE_READ;
DisableEvents(DE_READ);
SignalReadEvent(this);
}
if ((ff & DE_WRITE) != 0) {
enabled_events_ &= ~DE_WRITE;
DisableEvents(DE_WRITE);
SignalWriteEvent(this);
}
if ((ff & DE_CLOSE) != 0) {
// The socket is now dead to us, so stop checking it.
enabled_events_ = 0;
SetEnabledEvents(0);
SignalCloseEvent(this, err);
}
}

View File

@ -171,11 +171,15 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
void UpdateLastError();
void MaybeRemapSendError();
uint8_t enabled_events() const { return enabled_events_; }
void SetEnabledEvents(uint8_t events);
void EnableEvents(uint8_t events);
void DisableEvents(uint8_t events);
static int TranslateOption(Option opt, int* slevel, int* sopt);
PhysicalSocketServer* ss_;
SOCKET s_;
uint8_t enabled_events_;
bool udp_;
CriticalSection crit_;
int error_ GUARDED_BY(crit_);
@ -185,6 +189,9 @@ class PhysicalSocket : public AsyncSocket, public sigslot::has_slots<> {
#if !defined(NDEBUG)
std::string dbg_addr_;
#endif
private:
uint8_t enabled_events_ = 0;
};
class SocketDispatcher : public Dispatcher, public PhysicalSocket {