Allocate DCB on owning thread

The DCB is now fully allocated on the thread that owns it. This guarantees
that the owner is always correct when it is used.

The code in poll_add_dcb still manipulates which worker the DCB is
allocated. This needs to be removed and the detection of special needs
(maxadmin, maxinfo) must be moved into the listener.
This commit is contained in:
Markus Mäkelä
2019-04-26 10:16:03 +03:00
parent dc244342f8
commit 510cae2fe0
2 changed files with 103 additions and 84 deletions

View File

@ -271,9 +271,18 @@ private:
/**
* Accept a single client connection
*
* @param fd The opened file descriptor to which the client is connected to
* @param addr The network information
* @param host The host where the client is connecting from
*
* @return The new DCB or nullptr on error
*/
DCB* accept_one_dcb();
DCB* accept_one_dcb(int fd, const sockaddr_storage* addr, const char* host);
/**
* Accept all available client connections
*/
void accept_connections();
/**
* The file descriptor for accepting new connections

View File

@ -920,82 +920,85 @@ int start_listening(const std::string& host, uint16_t port)
return listener_socket;
}
// Helper struct that contains the network information of an accepted connection
struct ClientConn
{
int fd;
sockaddr_storage addr;
char host[INET6_ADDRSTRLEN + 1];
};
/**
* @brief Accept a new client connection
*
* @param fd File descriptor to accept from
* @param client_conn Output where connection information is stored
*
* @return -1 for failure, or a file descriptor for the new connection
* @return ClientConn with fd set to -1 on failure
*/
static int accept_one_connection(int fd, struct sockaddr* client_conn)
static ClientConn accept_one_connection(int fd)
{
socklen_t client_len = sizeof(struct sockaddr_storage);
ClientConn conn = {};
socklen_t client_len = sizeof(conn.addr);
conn.fd = accept(fd, (sockaddr*)&conn.addr, &client_len);
/* new connection from client */
int client_fd = accept(fd, client_conn, &client_len);
if (client_fd == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
if (conn.fd != -1)
{
MXS_ERROR("Failed to accept new client connection: %d, %s", errno, mxs_strerror(errno));
}
return client_fd;
}
}
DCB* Listener::accept_one_dcb()
{
DCB* client_dcb = NULL;
int c_sock;
struct sockaddr_storage client_conn;
if ((c_sock = accept_one_connection(fd(), (struct sockaddr*)&client_conn)) >= 0)
{
/* client IP in string representation */
char ipbuf[INET6_ADDRSTRLEN + 1] = "localhost";
void* ptr = nullptr;
if (client_conn.ss_family == AF_INET)
if (conn.addr.ss_family == AF_INET)
{
ptr = &((struct sockaddr_in*)&client_conn)->sin_addr;
ptr = &((struct sockaddr_in*)&conn.addr)->sin_addr;
}
else if (client_conn.ss_family == AF_INET6)
else if (conn.addr.ss_family == AF_INET6)
{
ptr = &((struct sockaddr_in6*)&client_conn)->sin6_addr;
ptr = &((struct sockaddr_in6*)&conn.addr)->sin6_addr;
}
if (ptr)
{
inet_ntop(client_conn.ss_family, ptr, ipbuf, INET6_ADDRSTRLEN);
inet_ntop(conn.addr.ss_family, ptr, conn.host, sizeof(conn.host) - 1);
}
else
{
strcpy(conn.host, "localhost");
}
configure_network_socket(c_sock, client_conn.ss_family);
configure_network_socket(conn.fd, conn.addr.ss_family);
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
MXS_ERROR("Failed to accept new client connection: %d, %s", errno, mxs_strerror(errno));
}
return conn;
}
}
DCB* Listener::accept_one_dcb(int fd, const sockaddr_storage* addr, const char* host)
{
mxs::Session* session = new(std::nothrow) mxs::Session(m_self);
if (!session)
{
MXS_OOM();
close(c_sock);
close(fd);
return NULL;
}
client_dcb = dcb_alloc(DCB::Role::CLIENT, session);
DCB* client_dcb = dcb_alloc(DCB::Role::CLIENT, session);
if (!client_dcb)
{
MXS_OOM();
close(c_sock);
close(fd);
delete session;
}
else
{
session->set_client_dcb(client_dcb);
memcpy(&client_dcb->ip, &client_conn, sizeof(client_conn));
client_dcb->fd = c_sock;
client_dcb->remote = MXS_STRDUP_A(ipbuf);
memcpy(&client_dcb->ip, addr, sizeof(*addr));
client_dcb->fd = fd;
client_dcb->remote = MXS_STRDUP_A(host);
/** Allocate DCB specific authentication data */
if (m_auth_func.create
@ -1022,7 +1025,6 @@ DCB* Listener::accept_one_dcb()
client_dcb = NULL;
}
}
}
if (client_dcb)
{
@ -1141,22 +1143,30 @@ bool Listener::listen()
uint32_t Listener::poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events)
{
Listener* listener = static_cast<Listener*>(data);
DCB* client_dcb;
listener->accept_connections();
return MXB_POLL_ACCEPT;
}
while ((client_dcb = listener->accept_one_dcb()))
void Listener::accept_connections()
{
if (listener->type() == Type::UNIQUE_TCP)
for (ClientConn conn = accept_one_connection(fd()); conn.fd != -1; conn = accept_one_connection(fd()))
{
listener->m_proto_func.accept(client_dcb);
if (type() == Type::UNIQUE_TCP)
{
if (DCB* dcb = accept_one_dcb(conn.fd, &conn.addr, conn.host))
{
m_proto_func.accept(dcb);
}
}
else
{
auto worker = mxs::RoutingWorker::pick_worker();
worker->execute([listener, client_dcb]() {
listener->m_proto_func.accept(client_dcb);
worker->execute([this, conn]() {
if (DCB* dcb = accept_one_dcb(conn.fd, &conn.addr, conn.host))
{
m_proto_func.accept(dcb);
}
}, mxs::RoutingWorker::EXECUTE_AUTO);
}
}
return 1;
}