diff --git a/include/maxscale/dcb.hh b/include/maxscale/dcb.hh index ba0f631bb..e438c7aae 100644 --- a/include/maxscale/dcb.hh +++ b/include/maxscale/dcb.hh @@ -262,8 +262,7 @@ typedef enum */ void dcb_global_init(); -int dcb_write(DCB*, GWBUF*); -DCB* dcb_accept(const SListener& listener); +int dcb_write(DCB*, GWBUF*); DCB* dcb_alloc(dcb_role_t, const SListener&, SERVICE* service); DCB* dcb_connect(struct server*, struct session*, const char*); int dcb_read(DCB*, GWBUF**, int); diff --git a/include/maxscale/listener.hh b/include/maxscale/listener.hh index 4a4b7fad9..fec07cfb0 100644 --- a/include/maxscale/listener.hh +++ b/include/maxscale/listener.hh @@ -246,8 +246,6 @@ private: // Handler for EPOLL_IN events static uint32_t poll_handler(MXB_POLL_DATA* data, MXB_WORKER* worker, uint32_t events); - - friend DCB* dcb_accept(const SListener& listener); }; /** diff --git a/server/core/dcb.cc b/server/core/dcb.cc index ca0fabbae..a4d9b9b65 100644 --- a/server/core/dcb.cc +++ b/server/core/dcb.cc @@ -104,7 +104,6 @@ static void dcb_log_write_failure(DCB* dcb, GWBUF* queue, int eno); static int gw_write(DCB* dcb, GWBUF* writeq, bool* stop_writing); static int gw_write_SSL(DCB* dcb, GWBUF* writeq, bool* stop_writing); static int dcb_log_errors_SSL(DCB* dcb, int ret); -static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn); static int dcb_listen_create_socket_inet(const char* host, uint16_t port); static int dcb_listen_create_socket_unix(const char* path); static int dcb_set_socket_option(int sockfd, int level, int optname, void* optval, socklen_t optlen); @@ -169,6 +168,11 @@ DCB::DCB(dcb_role_t role, const SListener& listener, SERVICE* service) , service(service) , last_read(mxs_clock()) { + if (high_water && low_water) + { + dcb_add_callback(this, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL); + dcb_add_callback(this, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL); + } } DCB::~DCB() @@ -2302,200 +2306,6 @@ int dcb_connect_SSL(DCB* dcb) return return_code; } -/** - * @brief Accept a new client connection, given a listener, return new DCB - * - * Calls dcb_accept_one_connection to do the basic work of obtaining a new - * connection from a listener. If that succeeds, some settings are fixed and - * a client DCB is created to handle the new connection. Further DCB details - * are set before returning the new DCB to the caller, or returning NULL if - * no new connection could be achieved. - * - * @param listener Listener that has a new connection request - * - * @return DCB - The new client DCB for the new connection, or NULL if failed - */ -DCB* dcb_accept(const SListener& listener) -{ - DCB* client_dcb = NULL; - int c_sock; - struct sockaddr_storage client_conn; - - if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0) - { - configure_network_socket(c_sock, client_conn.ss_family); - - client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, listener, listener->service()); - - if (client_dcb == NULL) - { - MXS_ERROR("Failed to create DCB object for client connection."); - close(c_sock); - } - else - { - client_dcb->session = session_set_dummy(client_dcb); - client_dcb->fd = c_sock; - - // get client address - if (client_conn.ss_family == AF_UNIX) - { - // client address - client_dcb->ip.ss_family = AF_UNIX; - client_dcb->remote = MXS_STRDUP_A("localhost"); - client_dcb->path = MXS_STRDUP_A(listener->address()); - } - else - { - /* client IP in raw data*/ - memcpy(&client_dcb->ip, &client_conn, sizeof(client_conn)); - /* client IP in string representation */ - client_dcb->remote = (char*)MXS_CALLOC(INET6_ADDRSTRLEN + 1, sizeof(char)); - - if (client_dcb->remote) - { - void* ptr; - if (client_dcb->ip.ss_family == AF_INET) - { - ptr = &((struct sockaddr_in*)&client_dcb->ip)->sin_addr; - } - else - { - ptr = &((struct sockaddr_in6*)&client_dcb->ip)->sin6_addr; - } - - inet_ntop(client_dcb->ip.ss_family, - ptr, - client_dcb->remote, - INET6_ADDRSTRLEN); - } - } - - client_dcb->func = listener->protocol_func(); - client_dcb->authfunc = listener->auth_func(); - - /** Allocate DCB specific authentication data */ - if (client_dcb->authfunc.create - && (client_dcb->authenticator_data = - client_dcb->authfunc.create(client_dcb->listener->auth_instance())) == NULL) - { - MXS_ERROR("Failed to create authenticator for client DCB"); - dcb_close(client_dcb); - return NULL; - } - - /* Register downstream throttling callbacks */ - if (DCB_THROTTLING_ENABLED(client_dcb)) - { - dcb_add_callback(client_dcb, DCB_REASON_HIGH_WATER, downstream_throttle_callback, NULL); - dcb_add_callback(client_dcb, DCB_REASON_LOW_WATER, downstream_throttle_callback, NULL); - } - - if (client_dcb->service->max_connections - && client_dcb->service->client_count >= client_dcb->service->max_connections) - { - // TODO: If connections can be queued, this is the place to put the - // TODO: connection on that queue. - if (client_dcb->func.connlimit) - { - client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections); - } - client_dcb->session->close_reason = SESSION_CLOSE_TOO_MANY_CONNECTIONS; - dcb_close(client_dcb); - client_dcb = NULL; - } - } - } - - if (client_dcb) - { - mxb::atomic::add(&client_dcb->service->client_count, 1); - } - - return client_dcb; -} - -/** - * @brief Accept a new client connection, given listener, return file descriptor - * - * Up to 10 retries will be attempted in case of non-permanent errors. Calls - * the accept function and analyses the return, logging any errors and making - * an appropriate return. - * - * @param fd File descriptor to accept from - * @param client_conn Output where connection information is stored - * - * @return -1 for failure, or a file descriptor for the new connection - */ -static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn) -{ - int c_sock; - - /* Try up to 10 times to get a file descriptor by use of accept */ - for (int i = 0; i < 10; i++) - { - socklen_t client_len = sizeof(struct sockaddr_storage); - int eno = 0; - - /* new connection from client */ - c_sock = accept(fd, client_conn, &client_len); - eno = errno; - errno = 0; - - if (c_sock == -1) - { - /* Did not get a file descriptor */ - if (eno == EAGAIN || eno == EWOULDBLOCK) - { - /** - * We have processed all incoming connections, break out - * of loop for return of -1. - */ - break; - } - else if (eno == ENFILE || eno == EMFILE) - { - struct timespec ts1; - long long nanosecs; - - /** - * Exceeded system's (ENFILE) or processes - * (EMFILE) max. number of files limit. - */ - - /* Log an error the first time this happens */ - if (i == 0) - { - MXS_ERROR("Failed to accept new client connection: %d, %s", - eno, - mxs_strerror(eno)); - } - nanosecs = (long long)1000000 * 100 * i * i; - ts1.tv_sec = nanosecs / 1000000000; - ts1.tv_nsec = nanosecs % 1000000000; - nanosleep(&ts1, NULL); - - /* Remain in loop for up to the loop limit, retries. */ - } - else - { - /** - * Other error, log it then break out of loop for return of -1. - */ - MXS_ERROR("Failed to accept new client connection: %d, %s", - eno, - mxs_strerror(eno)); - break; - } - } - else - { - break; - } - } - return c_sock; -} - /** * @brief Set socket options, log an error if fails * diff --git a/server/core/listener.cc b/server/core/listener.cc index 654e88f4c..b845a21f2 100644 --- a/server/core/listener.cc +++ b/server/core/listener.cc @@ -13,6 +13,7 @@ #include +#include #include #include #include @@ -826,6 +827,191 @@ int start_listening(const char* config) return listener_socket; } + + +/** + * @brief Accept a new client connection, given listener, return file descriptor + * + * Up to 10 retries will be attempted in case of non-permanent errors. Calls + * the accept function and analyses the return, logging any errors and making + * an appropriate return. + * + * @param fd File descriptor to accept from + * @param client_conn Output where connection information is stored + * + * @return -1 for failure, or a file descriptor for the new connection + */ +static int dcb_accept_one_connection(int fd, struct sockaddr* client_conn) +{ + int c_sock; + + /* Try up to 10 times to get a file descriptor by use of accept */ + for (int i = 0; i < 10; i++) + { + socklen_t client_len = sizeof(struct sockaddr_storage); + int eno = 0; + + /* new connection from client */ + c_sock = accept(fd, client_conn, &client_len); + eno = errno; + errno = 0; + + if (c_sock == -1) + { + /* Did not get a file descriptor */ + if (eno == EAGAIN || eno == EWOULDBLOCK) + { + /** + * We have processed all incoming connections, break out + * of loop for return of -1. + */ + break; + } + else if (eno == ENFILE || eno == EMFILE) + { + struct timespec ts1; + long long nanosecs; + + /** + * Exceeded system's (ENFILE) or processes + * (EMFILE) max. number of files limit. + */ + + /* Log an error the first time this happens */ + if (i == 0) + { + MXS_ERROR("Failed to accept new client connection: %d, %s", + eno, + mxs_strerror(eno)); + } + nanosecs = (long long)1000000 * 100 * i * i; + ts1.tv_sec = nanosecs / 1000000000; + ts1.tv_nsec = nanosecs % 1000000000; + nanosleep(&ts1, NULL); + + /* Remain in loop for up to the loop limit, retries. */ + } + else + { + /** + * Other error, log it then break out of loop for return of -1. + */ + MXS_ERROR("Failed to accept new client connection: %d, %s", + eno, + mxs_strerror(eno)); + break; + } + } + else + { + break; + } + } + return c_sock; +} + +/** + * @brief Accept a new client connection, given a listener, return new DCB + * + * Calls dcb_accept_one_connection to do the basic work of obtaining a new + * connection from a listener. If that succeeds, some settings are fixed and + * a client DCB is created to handle the new connection. Further DCB details + * are set before returning the new DCB to the caller, or returning NULL if + * no new connection could be achieved. + * + * @param listener Listener that has a new connection request + * + * @return DCB - The new client DCB for the new connection, or NULL if failed + */ +DCB* dcb_accept(const SListener& listener) +{ + DCB* client_dcb = NULL; + int c_sock; + struct sockaddr_storage client_conn; + + if ((c_sock = dcb_accept_one_connection(listener->fd(), (struct sockaddr*)&client_conn)) >= 0) + { + configure_network_socket(c_sock, client_conn.ss_family); + + client_dcb = dcb_alloc(DCB_ROLE_CLIENT_HANDLER, listener, listener->service()); + + if (client_dcb == NULL) + { + MXS_ERROR("Failed to create DCB object for client connection."); + close(c_sock); + } + else + { + client_dcb->session = session_set_dummy(client_dcb); + client_dcb->fd = c_sock; + + // get client address + if (client_conn.ss_family == AF_UNIX) + { + // client address + client_dcb->ip.ss_family = AF_UNIX; + client_dcb->remote = MXS_STRDUP_A("localhost"); + client_dcb->path = MXS_STRDUP_A(listener->address()); + } + else + { + /* client IP in raw data*/ + memcpy(&client_dcb->ip, &client_conn, sizeof(client_conn)); + /* client IP in string representation */ + client_dcb->remote = (char*)MXS_CALLOC(INET6_ADDRSTRLEN + 1, sizeof(char)); + + if (client_dcb->remote) + { + void* ptr; + if (client_dcb->ip.ss_family == AF_INET) + { + ptr = &((struct sockaddr_in*)&client_dcb->ip)->sin_addr; + } + else + { + ptr = &((struct sockaddr_in6*)&client_dcb->ip)->sin6_addr; + } + + inet_ntop(client_dcb->ip.ss_family, ptr, client_dcb->remote, INET6_ADDRSTRLEN); + } + } + + client_dcb->func = listener->protocol_func(); + client_dcb->authfunc = listener->auth_func(); + + /** Allocate DCB specific authentication data */ + if (client_dcb->authfunc.create + && (client_dcb->authenticator_data = + client_dcb->authfunc.create(client_dcb->listener->auth_instance())) == NULL) + { + MXS_ERROR("Failed to create authenticator for client DCB"); + dcb_close(client_dcb); + return NULL; + } + + if (client_dcb->service->max_connections + && client_dcb->service->client_count >= client_dcb->service->max_connections) + { + // TODO: If connections can be queued, this is the place to put the + // TODO: connection on that queue. + if (client_dcb->func.connlimit) + { + client_dcb->func.connlimit(client_dcb, client_dcb->service->max_connections); + } + client_dcb->session->close_reason = SESSION_CLOSE_TOO_MANY_CONNECTIONS; + dcb_close(client_dcb); + client_dcb = NULL; + } + } + } + + if (client_dcb) + { + mxb::atomic::add(&client_dcb->service->client_count, 1); + } + + return client_dcb; +} } bool Listener::listen_shared(std::string config_bind)