MXS-2196: Move dcb_accept into listener.cc

Moved the code into listener.cc as it's the only place where it is
used. Placed the DCB callback assignment into the DCB constructor as it
depended on static functions that were in dcb.cc.
This commit is contained in:
Markus Mäkelä 2018-12-01 21:13:21 +02:00
parent b3fbc6aa3d
commit 6cf672195a
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
4 changed files with 192 additions and 199 deletions

View File

@ -262,8 +262,7 @@ typedef enum
*/
void dcb_global_init();
int dcb_write(DCB*, GWBUF*);
DCB* dcb_accept(const SListener& listener);
int dcb_write(DCB*, GWBUF*);
DCB* dcb_alloc(dcb_role_t, const SListener&, SERVICE* service);
DCB* dcb_connect(struct server*, struct session*, const char*);
int dcb_read(DCB*, GWBUF**, int);

View File

@ -246,8 +246,6 @@ private:
// Handler for EPOLL_IN events
static uint32_t poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events);
friend DCB* dcb_accept(const SListener& listener);
};
/**

View File

@ -104,7 +104,6 @@ static void dcb_log_write_failure(DCB* dcb, GWBUF* queue, int eno);
static int gw_write(DCB* dcb, GWBUF* writeq, bool* stop_writing);
static int gw_write_SSL(DCB* dcb, GWBUF* writeq, bool* stop_writing);
static int dcb_log_errors_SSL(DCB* dcb, int ret);
static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn);
static int dcb_listen_create_socket_inet(const char* host, uint16_t port);
static int dcb_listen_create_socket_unix(const char* path);
static int dcb_set_socket_option(int sockfd, int level, int optname, void* optval, socklen_t optlen);
@ -169,6 +168,11 @@ DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service)
, service(service)
, last_read(mxs_clock())
{
if (high_water && low_water)
{
dcb_add_callback(this, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL);
dcb_add_callback(this, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL);
}
}
DCB::~DCB()
@ -2302,200 +2306,6 @@ int dcb_connect_SSL(DCB* dcb)
return return_code;
}
/**
* @brief Accept a new client connection, given a listener, return new DCB
*
* Calls dcb_accept_one_connection to do the basic work of obtaining a new
* connection from a listener. If that succeeds, some settings are fixed and
* a client DCB is created to handle the new connection. Further DCB details
* are set before returning the new DCB to the caller, or returning NULL if
* no new connection could be achieved.
*
* @param listener Listener that has a new connection request
*
* @return DCB - The new client DCB for the new connection, or NULL if failed
*/
DCB* dcb_accept(const SListener& listener)
{
DCB* client_dcb = NULL;
int c_sock;
struct sockaddr_storage client_conn;
if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0)
{
configure_network_socket(c_sock, client_conn.ss_family);
client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, listener, listener->service());
if (client_dcb == NULL)
{
MXS_ERROR("Failed to create DCB object for client connection.");
close(c_sock);
}
else
{
client_dcb->session = session_set_dummy(client_dcb);
client_dcb->fd = c_sock;
// get client address
if (client_conn.ss_family == AF_UNIX)
{
// client address
client_dcb->ip.ss_family = AF_UNIX;
client_dcb->remote = MXS_STRDUP_A("localhost");
client_dcb->path = MXS_STRDUP_A(listener->address());
}
else
{
/* client IP in raw data*/
memcpy(&client_dcb->ip, &client_conn, sizeof(client_conn));
/* client IP in string representation */
client_dcb->remote = (char*)MXS_CALLOC(INET6_ADDRSTRLEN + 1, sizeof(char));
if (client_dcb->remote)
{
void* ptr;
if (client_dcb->ip.ss_family == AF_INET)
{
ptr = &((struct sockaddr_in*)&client_dcb->ip)->sin_addr;
}
else
{
ptr = &((struct sockaddr_in6*)&client_dcb->ip)->sin6_addr;
}
inet_ntop(client_dcb->ip.ss_family,
ptr,
client_dcb->remote,
INET6_ADDRSTRLEN);
}
}
client_dcb->func = listener->protocol_func();
client_dcb->authfunc = listener->auth_func();
/** Allocate DCB specific authentication data */
if (client_dcb->authfunc.create
&& (client_dcb->authenticator_data =
client_dcb->authfunc.create(client_dcb->listener->auth_instance())) == NULL)
{
MXS_ERROR("Failed to create authenticator for client DCB");
dcb_close(client_dcb);
return NULL;
}
/* Register downstream throttling callbacks */
if (DCB_THROTTLING_ENABLED(client_dcb))
{
dcb_add_callback(client_dcb, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL);
dcb_add_callback(client_dcb, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL);
}
if (client_dcb->service->max_connections
&& client_dcb->service->client_count >= client_dcb->service->max_connections)
{
// TODO: If connections can be queued, this is the place to put the
// TODO: connection on that queue.
if (client_dcb->func.connlimit)
{
client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections);
}
client_dcb->session->close_reason = SESSION_CLOSE_TOO_MANY_CONNECTIONS;
dcb_close(client_dcb);
client_dcb = NULL;
}
}
}
if (client_dcb)
{
mxb::atomic::add(&client_dcb->service->client_count, 1);
}
return client_dcb;
}
/**
* @brief Accept a new client connection, given listener, return file descriptor
*
* Up to 10 retries will be attempted in case of non-permanent errors. Calls
* the accept function and analyses the return, logging any errors and making
* an appropriate return.
*
* @param fd File descriptor to accept from
* @param client_conn Output where connection information is stored
*
* @return -1 for failure, or a file descriptor for the new connection
*/
static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn)
{
int c_sock;
/* Try up to 10 times to get a file descriptor by use of accept */
for (int i = 0; i < 10; i++)
{
socklen_t client_len = sizeof(struct sockaddr_storage);
int eno = 0;
/* new connection from client */
c_sock = accept(fd, client_conn, &client_len);
eno = errno;
errno = 0;
if (c_sock == -1)
{
/* Did not get a file descriptor */
if (eno == EAGAIN || eno == EWOULDBLOCK)
{
/**
* We have processed all incoming connections, break out
* of loop for return of -1.
*/
break;
}
else if (eno == ENFILE || eno == EMFILE)
{
struct timespec ts1;
long long nanosecs;
/**
* Exceeded system's (ENFILE) or processes
* (EMFILE) max. number of files limit.
*/
/* Log an error the first time this happens */
if (i == 0)
{
MXS_ERROR("Failed to accept new client connection: %d, %s",
eno,
mxs_strerror(eno));
}
nanosecs = (long long)1000000 * 100 * i * i;
ts1.tv_sec = nanosecs / 1000000000;
ts1.tv_nsec = nanosecs % 1000000000;
nanosleep(&ts1, NULL);
/* Remain in loop for up to the loop limit, retries. */
}
else
{
/**
* Other error, log it then break out of loop for return of -1.
*/
MXS_ERROR("Failed to accept new client connection: %d, %s",
eno,
mxs_strerror(eno));
break;
}
}
else
{
break;
}
}
return c_sock;
}
/**
* @brief Set socket options, log an error if fails
*

View File

@ -13,6 +13,7 @@
#include <maxscale/listener.hh>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -826,6 +827,191 @@ int start_listening(const char* config)
return listener_socket;
}
/**
* @brief Accept a new client connection, given listener, return file descriptor
*
* Up to 10 retries will be attempted in case of non-permanent errors. Calls
* the accept function and analyses the return, logging any errors and making
* an appropriate return.
*
* @param fd File descriptor to accept from
* @param client_conn Output where connection information is stored
*
* @return -1 for failure, or a file descriptor for the new connection
*/
static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn)
{
int c_sock;
/* Try up to 10 times to get a file descriptor by use of accept */
for (int i = 0; i < 10; i++)
{
socklen_t client_len = sizeof(struct sockaddr_storage);
int eno = 0;
/* new connection from client */
c_sock = accept(fd, client_conn, &client_len);
eno = errno;
errno = 0;
if (c_sock == -1)
{
/* Did not get a file descriptor */
if (eno == EAGAIN || eno == EWOULDBLOCK)
{
/**
* We have processed all incoming connections, break out
* of loop for return of -1.
*/
break;
}
else if (eno == ENFILE || eno == EMFILE)
{
struct timespec ts1;
long long nanosecs;
/**
* Exceeded system's (ENFILE) or processes
* (EMFILE) max. number of files limit.
*/
/* Log an error the first time this happens */
if (i == 0)
{
MXS_ERROR("Failed to accept new client connection: %d, %s",
eno,
mxs_strerror(eno));
}
nanosecs = (long long)1000000 * 100 * i * i;
ts1.tv_sec = nanosecs / 1000000000;
ts1.tv_nsec = nanosecs % 1000000000;
nanosleep(&ts1, NULL);
/* Remain in loop for up to the loop limit, retries. */
}
else
{
/**
* Other error, log it then break out of loop for return of -1.
*/
MXS_ERROR("Failed to accept new client connection: %d, %s",
eno,
mxs_strerror(eno));
break;
}
}
else
{
break;
}
}
return c_sock;
}
/**
* @brief Accept a new client connection, given a listener, return new DCB
*
* Calls dcb_accept_one_connection to do the basic work of obtaining a new
* connection from a listener. If that succeeds, some settings are fixed and
* a client DCB is created to handle the new connection. Further DCB details
* are set before returning the new DCB to the caller, or returning NULL if
* no new connection could be achieved.
*
* @param listener Listener that has a new connection request
*
* @return DCB - The new client DCB for the new connection, or NULL if failed
*/
DCB* dcb_accept(const SListener& listener)
{
DCB* client_dcb = NULL;
int c_sock;
struct sockaddr_storage client_conn;
if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0)
{
configure_network_socket(c_sock, client_conn.ss_family);
client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, listener, listener->service());
if (client_dcb == NULL)
{
MXS_ERROR("Failed to create DCB object for client connection.");
close(c_sock);
}
else
{
client_dcb->session = session_set_dummy(client_dcb);
client_dcb->fd = c_sock;
// get client address
if (client_conn.ss_family == AF_UNIX)
{
// client address
client_dcb->ip.ss_family = AF_UNIX;
client_dcb->remote = MXS_STRDUP_A("localhost");
client_dcb->path = MXS_STRDUP_A(listener->address());
}
else
{
/* client IP in raw data*/
memcpy(&client_dcb->ip, &client_conn, sizeof(client_conn));
/* client IP in string representation */
client_dcb->remote = (char*)MXS_CALLOC(INET6_ADDRSTRLEN + 1, sizeof(char));
if (client_dcb->remote)
{
void* ptr;
if (client_dcb->ip.ss_family == AF_INET)
{
ptr = &((struct sockaddr_in*)&client_dcb->ip)->sin_addr;
}
else
{
ptr = &((struct sockaddr_in6*)&client_dcb->ip)->sin6_addr;
}
inet_ntop(client_dcb->ip.ss_family, ptr, client_dcb->remote, INET6_ADDRSTRLEN);
}
}
client_dcb->func = listener->protocol_func();
client_dcb->authfunc = listener->auth_func();
/** Allocate DCB specific authentication data */
if (client_dcb->authfunc.create
&& (client_dcb->authenticator_data =
client_dcb->authfunc.create(client_dcb->listener->auth_instance())) == NULL)
{
MXS_ERROR("Failed to create authenticator for client DCB");
dcb_close(client_dcb);
return NULL;
}
if (client_dcb->service->max_connections
&& client_dcb->service->client_count >= client_dcb->service->max_connections)
{
// TODO: If connections can be queued, this is the place to put the
// TODO: connection on that queue.
if (client_dcb->func.connlimit)
{
client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections);
}
client_dcb->session->close_reason = SESSION_CLOSE_TOO_MANY_CONNECTIONS;
dcb_close(client_dcb);
client_dcb = NULL;
}
}
}
if (client_dcb)
{
mxb::atomic::add(&client_dcb->service->client_count, 1);
}
return client_dcb;
}
}
bool Listener::listen_shared(std::string config_bind)