diff --git a/server/core/config.c b/server/core/config.c index 3c3ae9f96..d964ba5fa 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -28,6 +28,7 @@ * 23/07/13 Mark Riddoch Addition on default monitor password * 06/02/14 Massimiliano Pinto Added support for enable/disable root user in services * 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list + * 11/03/14 Massimiliano Pinto Added Unix socket support * * @endverbatim */ @@ -344,14 +345,36 @@ int error_count = 0; char *address; char *port; char *protocol; + char *socket; service = config_get_value(obj->parameters, "service"); port = config_get_value(obj->parameters, "port"); address = config_get_value(obj->parameters, "address"); protocol = config_get_value(obj->parameters, "protocol"); - - if (service && port && protocol) - { + socket = config_get_value(obj->parameters, "socket"); + + if (service && socket && protocol) { + CONFIG_CONTEXT *ptr = context; + while (ptr && strcmp(ptr->object, service) != 0) + ptr = ptr->next; + if (ptr && ptr->element) + { + serviceAddProtocol(ptr->element, + protocol, + socket, + 0); + } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Listener '%s', " + "service '%s' not found. " + "Listener will not execute for socket %s.", + obj->object, service, socket))); + error_count++; + } + } + + if (service && port && protocol) { CONFIG_CONTEXT *ptr = context; while (ptr && strcmp(ptr->object, service) != 0) ptr = ptr->next; @@ -763,11 +786,35 @@ SERVER *server; char *port; char *protocol; char *address; + char *socket; service = config_get_value(obj->parameters, "service"); address = config_get_value(obj->parameters, "address"); port = config_get_value(obj->parameters, "port"); protocol = config_get_value(obj->parameters, "protocol"); + socket = config_get_value(obj->parameters, "socket"); + + if (service && socket && protocol) + { + CONFIG_CONTEXT *ptr = context; + while (ptr && strcmp(ptr->object, service) != 0) + ptr = ptr->next; + + if (ptr && + ptr->element && + serviceHasProtocol(ptr->element, + protocol, + 0) == 0) + { + serviceAddProtocol(ptr->element, + protocol, + socket, + 0); + serviceStartProtocol(ptr->element, + protocol, + 0); + } + } if (service && port && protocol) { @@ -834,6 +881,8 @@ static char *listener_params[] = "service", "protocol", "port", + "address", + "socket", NULL }; diff --git a/server/core/poll.c b/server/core/poll.c index 22031a380..8e4bd211b 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -357,53 +357,6 @@ poll_waitevents(void *arg) dcb, STRDCBROLE(dcb->dcb_role)))); - if (ev & EPOLLERR) - { - int eno = gw_getsockerrno(dcb->fd); -#if defined(SS_DEBUG) - if (eno == 0) { - eno = dcb_fake_write_errno[dcb->fd]; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake errno %d. " - "%s", - pthread_self(), - eno, - strerror(eno)))); - } - dcb_fake_write_errno[dcb->fd] = 0; -#endif - if (eno != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLERR due %d, %s.", - pthread_self(), - eno, - strerror(eno)))); - } - atomic_add(&pollStats.n_error, 1); - dcb->func.error(dcb); - } - if (ev & EPOLLHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); - } if (ev & EPOLLOUT) { int eno = 0; @@ -480,6 +433,53 @@ poll_waitevents(void *arg) &dcb->dcb_read_lock); #endif } + if (ev & EPOLLERR) + { + int eno = gw_getsockerrno(dcb->fd); +#if defined(SS_DEBUG) + if (eno == 0) { + eno = dcb_fake_write_errno[dcb->fd]; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake errno %d. " + "%s", + pthread_self(), + eno, + strerror(eno)))); + } + dcb_fake_write_errno[dcb->fd] = 0; +#endif + if (eno != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLERR due %d, %s.", + pthread_self(), + eno, + strerror(eno)))); + } + atomic_add(&pollStats.n_error, 1); + dcb->func.error(dcb); + } + if (ev & EPOLLHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + dcb->func.hangup(dcb); + } } /*< for */ no_op = FALSE; } diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 970937786..7cf06dd2f 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -47,6 +47,7 @@ #include #include #include +#include #include #include #include diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 2566264fd..2b4b68b65 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -32,6 +32,7 @@ * 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded with time and frequency limitations * If current user is authenticated the new users' table will replace the old one * 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string representation + * 11/03/2014 Massimiliano Pinto Added: Unix socket support * */ @@ -807,47 +808,94 @@ int gw_MySQLListener( { int l_so; struct sockaddr_in serv_addr; + struct sockaddr_un local_addr; + struct sockaddr *current_addr; int one = 1; int rc; - /* this gateway, as default, will bind on port 4404 for localhost only */ - if (!parse_bindconfig(config_bind, 4406, &serv_addr)) - return 0; - listen_dcb->fd = -1; - - // socket create - if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - fprintf(stderr, - "\n* Error: can't open listening socket due " - "error %i, %s.\n\n\t", - errno, - strerror(errno)); - return 0; + if (strchr(config_bind, '/')) { + char *tmp = strrchr(config_bind, ':'); + if (tmp) + *tmp = '\0'; + + // UNIX socket create + if ((l_so = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, + "\n* Error: can't create UNIX socket due " + "error %i, %s.\n\n\t", + errno, + strerror(errno)); + return 0; + } + memset(&local_addr, 0, sizeof(local_addr)); + local_addr.sun_family = AF_UNIX; + strncpy(local_addr.sun_path, config_bind, sizeof(local_addr.sun_path) - 1); + + current_addr = (struct sockaddr *) &local_addr; + + } else { + /* MaxScale, as default, will bind on port 4406 */ + if (!parse_bindconfig(config_bind, 4406, &serv_addr)) { + fprintf(stderr, "Error in parse_bindconfig for [%s]\n", config_bind); + return 0; + } + // TCP socket create + if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, + "\n* Error: can't create socket due " + "error %i, %s.\n\n\t", + errno, + strerror(errno)); + return 0; + } + + current_addr = (struct sockaddr *) &serv_addr; } + + listen_dcb->fd = -1; + // socket options setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); // set NONBLOCKING mode - setnonblocking(l_so); + setnonblocking(l_so); + + /* get the right socket family for bind */ + switch (current_addr->sa_family) { + case AF_UNIX: + rc = unlink(config_bind); + if ( (rc == -1) && (errno!=ENOENT) ) { + fprintf(stderr, "Error unlink Unix Socket %s\n", config_bind); + } + + if (bind(l_so, (struct sockaddr *) &local_addr, sizeof(local_addr)) < 0) { + fprintf(stderr, + "\n* Bind failed due error %i, %s.\n", + errno, + strerror(errno)); + fprintf(stderr, "* Can't bind to %s\n\n", config_bind); + + return 0; + } + break; + + case AF_INET: + if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + fprintf(stderr, + "\n* Bind failed due error %i, %s.\n", + errno, + strerror(errno)); + fprintf(stderr, "* Can't bind to %s\n\n", config_bind); + + return 0; + } + break; + + default: + fprintf(stderr, "* Socket Family %i not supported\n", current_addr->sa_family); + return 0; + } - // bind address and port - if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - fprintf(stderr, - "\n* Bind failed due error %i, %s.\n", - errno, - strerror(errno)); - fprintf(stderr, "* Can't bind to %s\n\n", - config_bind); - return 0; - } - /* - fprintf(stderr, - ">> GATEWAY bind is: %s:%i. FD is %i\n", - address, - port, - l_so); - */ - rc = listen(l_so, 10 * SOMAXCONN); if (rc == 0) { @@ -863,11 +911,6 @@ int gw_MySQLListener( strerror(eno)); return 0; } - /* - fprintf(stderr, - ">> GATEWAY listen backlog queue is %i\n", - 10 * SOMAXCONN); - */ // assign l_so to dcb listen_dcb->fd = l_so; @@ -908,8 +951,8 @@ int gw_MySQLAccept(DCB *listener) DCB *client_dcb; MySQLProtocol *protocol; int c_sock; - struct sockaddr_in local; - socklen_t addrlen = sizeof(struct sockaddr_in); + struct sockaddr client_conn; + socklen_t client_len = sizeof(struct sockaddr_storage); int sendbuf = GW_BACKEND_SO_SNDBUF; socklen_t optlen = sizeof(sendbuf); int eno = 0; @@ -932,8 +975,8 @@ int gw_MySQLAccept(DCB *listener) #endif /* SS_DEBUG */ // new connection from client c_sock = accept(listener->fd, - (struct sockaddr *) &local, - &addrlen); + (struct sockaddr *) &client_conn, + &client_len); eno = errno; errno = 0; #if defined(SS_DEBUG) @@ -1020,19 +1063,38 @@ int gw_MySQLAccept(DCB *listener) setnonblocking(c_sock); client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + + if (client_dcb == NULL) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_MySQLAccept] Failed to create " + "dcb object for client connection.", + pthread_self()))); + rc = 1; + goto return_rc; + } + client_dcb->service = listener->session->service; client_dcb->fd = c_sock; - /* client IPv4 in raw data*/ - memcpy(&client_dcb->ipv4, &local, sizeof(struct sockaddr_in)); - - /* client IPv4 in string representation */ - client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); - if (client_dcb->remote != NULL) { - inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN); + // get client address + if ( client_conn.sa_family == AF_UNIX) { + // client address + client_dcb->remote = strdup("localhost_from_socket"); + // set localhost IP for user authentication + (client_dcb->ipv4).sin_addr.s_addr = 0x0100007F; + } else { + /* client IPv4 in raw data*/ + memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in)); + /* client IPv4 in string representation */ + client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); + if (client_dcb->remote != NULL) { + inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN); + } } protocol = mysql_protocol_init(client_dcb, c_sock); + ss_dassert(protocol != NULL); if (protocol == NULL) {