diff --git a/server/modules/protocol/CMakeLists.txt b/server/modules/protocol/CMakeLists.txt index fa1c2ab34..0bea95832 100644 --- a/server/modules/protocol/CMakeLists.txt +++ b/server/modules/protocol/CMakeLists.txt @@ -6,6 +6,15 @@ add_library(MySQLBackend SHARED mysql_backend.c mysql_common.c) target_link_libraries(MySQLBackend log_manager utils) install(TARGETS MySQLBackend DESTINATION modules) +add_library(mongoclient SHARED mongo_client.c mysql_common.c) +target_link_libraries(mongoclient log_manager utils) +install(TARGETS mongoclient DESTINATION modules) + +add_library(mongobackend SHARED mongo_backend.c mysql_common.c) +target_link_libraries(mongobackend log_manager utils) +install(TARGETS mongobackend DESTINATION modules) + + add_library(telnetd SHARED telnetd.c) target_link_libraries(telnetd log_manager utils) install(TARGETS telnetd DESTINATION modules) diff --git a/server/modules/protocol/mongo_backend.c b/server/modules/protocol/mongo_backend.c new file mode 100644 index 000000000..4fe1ff0b8 --- /dev/null +++ b/server/modules/protocol/mongo_backend.c @@ -0,0 +1,1204 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2014 + */ + +#include "mysql_client_server_protocol.h" +#include +#include +#include +#include + +/* + * MySQL Protocol module for handling the protocol between the gateway + * and the backend MySQL database. + * + * Revision History + * Date Who Description + * 14/06/2013 Mark Riddoch Initial version + * 17/06/2013 Massimiliano Pinto Added MaxScale To Backends routines + * 01/07/2013 Massimiliano Pinto Put Log Manager example code behind SS_DEBUG macros. + * 03/07/2013 Massimiliano Pinto Added delayq for incoming data before mysql connection + * 04/07/2013 Massimiliano Pinto Added asyncrhronous MySQL protocol connection to backend + * 05/07/2013 Massimiliano Pinto Added closeSession if backend auth fails + * 12/07/2013 Massimiliano Pinto Added Mysql Change User via dcb->func.auth() + * 15/07/2013 Massimiliano Pinto Added Mysql session change via dcb->func.session() + * 17/07/2013 Massimiliano Pinto Added dcb->command update from gwbuf->command for proper routing + server replies to client via router->clientReply + * 04/09/2013 Massimiliano Pinto Added dcb->session and dcb->session->client checks for NULL + * 12/09/2013 Massimiliano Pinto Added checks in gw_read_backend_event() for gw_read_backend_handshake + * 27/09/2013 Massimiliano Pinto Changed in gw_read_backend_event the check for dcb_read(), now is if rc < 0 + * 24/10/2014 Massimiliano Pinto Added Mysql user@host @db authentication support + * 10/11/2014 Massimiliano Pinto Client charset is passed to backend + * + */ +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_GA, + GWPROTOCOL_VERSION, + "The MySQL to backend server protocol" +}; + +/** Defined in log_manager.cc */ +extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; + +static char *version_str = "V2.0.0"; +static int gw_create_backend_connection(DCB *backend, SERVER *server, SESSION *in_session); +static int gw_read_backend_event(DCB* dcb); +static int gw_write_backend_event(DCB *dcb); +static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); +static int gw_error_backend_event(DCB *dcb); +static int gw_backend_close(DCB *dcb); +static int gw_backend_hangup(DCB *dcb); +static int backend_write_delayqueue(DCB *dcb); +static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); +static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); +static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process); +extern char* create_auth_failed_msg( GWBUF* readbuf, char* hostaddr, uint8_t* sha1); +extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db); +static bool sescmd_response_complete(DCB* dcb); + + +#if defined(NOT_USED) + static int gw_session(DCB *backend_dcb, void *data); +#endif +static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb); + +static GWPROTOCOL MyObject = { + gw_read_backend_event, /* Read - EPOLLIN handler */ + gw_MySQLWrite_backend, /* Write - data from gateway */ + gw_write_backend_event, /* WriteReady - EPOLLOUT handler */ + gw_error_backend_event, /* Error - EPOLLERR handler */ + gw_backend_hangup, /* HangUp - EPOLLHUP handler */ + NULL, /* Accept */ + gw_create_backend_connection, /* Connect */ + gw_backend_close, /* Close */ + NULL, /* Listen */ + gw_change_user, /* Authentication */ + NULL /* Session */ +}; + +/* + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/* + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/* + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +GWPROTOCOL * +GetModuleObject() +{ + return &MyObject; +} + + +static MYSQL_session* gw_get_shared_session_auth_info( + DCB* dcb) +{ + MYSQL_session* auth_info = NULL; + CHK_DCB(dcb); + CHK_SESSION(dcb->session); + + spinlock_acquire(&dcb->session->ses_lock); + + if (dcb->session->state != SESSION_STATE_ALLOC) { + auth_info = dcb->session->data; + } else { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_get_shared_session_auth_info] Couldn't get " + "session authentication info. Session in a wrong state %d.", + pthread_self(), + dcb->session->state))); + } + spinlock_release(&dcb->session->ses_lock); + + return auth_info; +} + +/** + * Backend Read Event for EPOLLIN on the MySQL backend protocol module + * @param dcb The backend Descriptor Control Block + * @return 1 on operation, 0 for no action + */ +static int gw_read_backend_event(DCB *dcb) { + MySQLProtocol *client_protocol = NULL; + MySQLProtocol *backend_protocol = NULL; + MYSQL_session *current_session = NULL; + int rc = 0; + + + backend_protocol = (MySQLProtocol *) dcb->protocol; + CHK_PROTOCOL(backend_protocol); + + + /* reading MySQL command output from backend and writing to the client */ + { + GWBUF *read_buffer = NULL; + ROUTER_OBJECT *router = NULL; + ROUTER *router_instance = NULL; + SESSION *session = dcb->session; + int nbytes_read = 0; + + CHK_SESSION(session); + router = session->service->router; + router_instance = session->service->router_instance; + + /* read available backend data */ + rc = dcb_read(dcb, &read_buffer); + + if (rc < 0) + { + GWBUF* errbuf; + bool succp; + + errbuf = mysql_create_custom_error( + 1, + 0, + "Read from backend failed"); + + router->handleError( + router_instance, + session->router_session, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + gwbuf_free(errbuf); + + if (!succp) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + ss_dassert(dcb->dcb_errhandle_called); + dcb_close(dcb); + rc = 0; + goto return_rc; + } + nbytes_read = gwbuf_length(read_buffer); + + if (nbytes_read == 0 && dcb->dcb_readqueue == NULL) + { + goto return_rc; + } + else + { + ss_dassert(read_buffer != NULL || dcb->dcb_readqueue != NULL); + } + + /** Packet prefix was read earlier */ + if (dcb->dcb_readqueue) + { + if (read_buffer != NULL) + { + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + } + else + { + read_buffer = dcb->dcb_readqueue; + } + nbytes_read = gwbuf_length(read_buffer); + + if (nbytes_read < 5) /*< read at least command type */ + { + rc = 0; + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%p [gw_read_backend_event] Read %d bytes " + "from DCB %p, fd %d, session %s. " + "Returning to poll wait.\n", + pthread_self(), + nbytes_read, + dcb, + dcb->fd, + dcb->session))); + goto return_rc; + } + /** There is at least length and command type. */ + else + { + dcb->dcb_readqueue = NULL; + } + } + /** This may be either short prefix of a packet, or the tail of it. */ + else + { + if (nbytes_read < 5) + { + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer); + rc = 0; + goto return_rc; + } + } + + + if (dcb->session->state == SESSION_STATE_ROUTER_READY && + dcb->session->client != NULL && + dcb->session->client->state == DCB_STATE_POLLING) + { + client_protocol = SESSION_PROTOCOL(dcb->session, + MySQLProtocol); + + { + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + router->clientReply(router_instance, session->router_session, read_buffer, dcb); + rc = 1; + } + } + else /*< session is closing; replying to client isn't possible */ + { + gwbuf_free(read_buffer); + } + } + +return_rc: + return rc; + +return_with_lock: + + goto return_rc; +} + +/* + * EPOLLOUT handler for the MySQL Backend protocol module. + * + * @param dcb The descriptor control block + * @return 1 in success, 0 in case of failure, + */ +static int gw_write_backend_event(DCB *dcb) { + int rc = 0; + MySQLProtocol *backend_protocol = dcb->protocol; + + /*< + * Don't write to backend if backend_dcb is not in poll set anymore. + */ + if (dcb->state != DCB_STATE_POLLING) { + uint8_t* data; + + if (dcb->writeq != NULL) + { + data = (uint8_t *)GWBUF_DATA(dcb->writeq); + + + } + else + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_write_backend_event] Dcb %p in state %s " + "but there's nothing to write either.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)))); + rc = 1; + } + goto return_rc; + } + + dcb_drain_writeq(dcb); + rc = 1; +return_rc: + + + return rc; +} + +/* + * Write function for backend DCB. Store command to protocol. + * + * @param dcb The DCB of the backend + * @param queue Queue of buffers to write + * @return 0 on failure, 1 on success + */ +static int +gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) +{ + MySQLProtocol *backend_protocol = dcb->protocol; + int rc = 0; + + rc = dcb_write(dcb, queue); + + +return_rc: + return rc; +} + +/** + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. + */ +static int gw_error_backend_event(DCB *dcb) +{ + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + GWBUF* errbuf; + bool succp; + session_state_t ses_state; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; + + /** + * Avoid running redundant error handling procedure. + * dcb_close is already called for the DCB. Thus, either connection is + * closed by router and COM_QUIT sent or there was an error which + * have already been handled. + */ + if (dcb->state != DCB_STATE_POLLING) + { + int error, len; + char buf[100]; + + len = sizeof(error); + + if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0) + { + if (error != 0) + { + strerror_r(error, buf, 100); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "DCB in state %s got error '%s'.", + STRDCBSTATE(dcb->state), + buf))); + } + } + return 1; + } + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); + + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + + /** + * Session might be initialized when DCB already is in the poll set. + * Thus hangup can occur in the middle of session initialization. + * Only complete and successfully initialized sessions allow for + * calling error handler. + */ + while (ses_state == SESSION_STATE_READY) + { + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + } + + if (ses_state != SESSION_STATE_ROUTER_READY) + { + int error, len; + char buf[100]; + + len = sizeof(error); + if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0) + { + if (error != 0) + { + strerror_r(error, buf, 100); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error '%s' in session that is not ready for routing.", + buf))); + } + } + gwbuf_free(errbuf); + goto retblock; + } + +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend error event handling."))); +#endif + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + gwbuf_free(errbuf); + + /** + * If error handler fails it means that routing session can't continue + * and it must be closed. In success, only this DCB is closed. + */ + if (!succp) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + ss_dassert(dcb->dcb_errhandle_called); + dcb_close(dcb); + +retblock: + return 1; +} + +/* + * Create a new backend connection. + * + * This routine will connect to a backend server and it is called by dbc_connect + * in router->newSession + * + * @param backend_dcb, in, out, use - backend DCB allocated from dcb_connect + * @param server, in, use - server to connect to + * @param session, in use - current session from client DCB + * @return 0/1 on Success and -1 on Failure. + * If succesful, returns positive fd to socket which is connected to + * backend server. Positive fd is copied to protocol and to dcb. + * If fails, fd == -1 and socket is closed. + */ +static int gw_create_backend_connection( + DCB *backend_dcb, + SERVER *server, + SESSION *session) +{ + MySQLProtocol *protocol = NULL; + int rv = -1; + int fd = -1; + + protocol = mysql_protocol_init(backend_dcb, -1); + ss_dassert(protocol != NULL); + + if (protocol == NULL) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_create_backend_connection] Failed to create " + "protocol object for backend connection.", + pthread_self()))); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to create " + "protocol object for backend connection."))); + goto return_fd; + } + + /*< if succeed, fd > 0, -1 otherwise */ + rv = gw_do_connect_to_backend(server->name, server->port, &fd); + /*< Assign protocol with backend_dcb */ + backend_dcb->protocol = protocol; + + /*< Set protocol state */ + switch (rv) { + case 0: + ss_dassert(fd > 0); + protocol->fd = fd; + protocol->protocol_auth_state = MYSQL_CONNECTED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_create_backend_connection] Established " + "connection to %s:%i, protocol fd %d client " + "fd %d.", + pthread_self(), + server->name, + server->port, + protocol->fd, + session->client->fd))); + break; + + case 1: + ss_dassert(fd > 0); + protocol->protocol_auth_state = MYSQL_PENDING_CONNECT; + protocol->fd = fd; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_create_backend_connection] Connection " + "pending to %s:%i, protocol fd %d client fd %d.", + pthread_self(), + server->name, + server->port, + protocol->fd, + session->client->fd))); + break; + + default: + ss_dassert(fd == -1); + ss_dassert(protocol->protocol_auth_state == MYSQL_ALLOC); + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_create_backend_connection] Connection " + "failed to %s:%i, protocol fd %d client fd %d.", + pthread_self(), + server->name, + server->port, + protocol->fd, + session->client->fd))); + break; + } /*< switch */ + +return_fd: + return fd; +} + + +/** + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. + * + * @param dcb The current Backend DCB + * @return 1 always + */ +static int +gw_backend_hangup(DCB *dcb) +{ + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + bool succp; + GWBUF* errbuf; + session_state_t ses_state; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; + + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); + + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + + /** + * Session might be initialized when DCB already is in the poll set. + * Thus hangup can occur in the middle of session initialization. + * Only complete and successfully initialized sessions allow for + * calling error handler. + */ + while (ses_state == SESSION_STATE_READY) + { + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + } + + if (ses_state != SESSION_STATE_ROUTER_READY) + { + int error, len; + char buf[100]; + + len = sizeof(error); + if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0) + { + if (error != 0) + { + strerror_r(error, buf, 100); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Hangup in session that is not ready for routing, " + "Error reported is '%s'.", + buf))); + } + } + gwbuf_free(errbuf); + goto retblock; + } +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend hangup error handling."))); +#endif + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + gwbuf_free(errbuf); + /** There are no required backends available, close session. */ + if (!succp) + { +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend hangup -> closing session."))); +#endif + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + ss_dassert(dcb->dcb_errhandle_called); + dcb_close(dcb); + +retblock: + return 1; +} + +/** + * Send COM_QUIT to backend so that it can be closed. + * @param dcb The current Backend DCB + * @return 1 always + */ +static int +gw_backend_close(DCB *dcb) +{ + DCB* client_dcb; + SESSION* session; + GWBUF* quitbuf; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + + LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, + "%lu [gw_backend_close]", + pthread_self()))); + + quitbuf = mysql_create_com_quit(NULL, 0); + gwbuf_set_type(quitbuf, GWBUF_TYPE_MYSQL); + + /** Send COM_QUIT to the backend being closed */ + mysql_send_com_quit(dcb, 0, quitbuf); + + mysql_protocol_done(dcb); + /** + * The lock is needed only to protect the read of session->state and + * session->client values. Client's state may change by other thread + * but client's close and adding client's DCB to zombies list is executed + * only if client's DCB's state does _not_ change in parallel. + */ + spinlock_acquire(&session->ses_lock); + /** + * If session->state is STOPPING, start closing client session. + * Otherwise only this backend connection is closed. + */ + if (session != NULL && + session->state == SESSION_STATE_STOPPING && + session->client != NULL) + { + if (session->client->state == DCB_STATE_POLLING) + { + spinlock_release(&session->ses_lock); + + /** Close client DCB */ + dcb_close(session->client); + } + else + { + spinlock_release(&session->ses_lock); + } + } + else + { + spinlock_release(&session->ses_lock); + } + return 1; +} + +/** + * This routine put into the delay queue the input queue + * The input is what backend DCB is receiving + * The routine is called from func.write() when mysql backend connection + * is not yet complete buu there are inout data from client + * + * @param dcb The current backend DCB + * @param queue Input data in the GWBUF struct + */ +static void backend_set_delayqueue(DCB *dcb, GWBUF *queue) { + spinlock_acquire(&dcb->delayqlock); + + if (dcb->delayq) { + /* Append data */ + dcb->delayq = gwbuf_append(dcb->delayq, queue); + } else { + if (queue != NULL) { + /* create the delay queue */ + dcb->delayq = queue; + } + } + spinlock_release(&dcb->delayqlock); +} + +/** + * This routine writes the delayq via dcb_write + * The dcb->delayq contains data received from the client before + * mysql backend authentication succeded + * + * @param dcb The current backend DCB + * @return The dcb_write status + */ +static int backend_write_delayqueue(DCB *dcb) +{ + GWBUF *localq = NULL; + int rc; + + spinlock_acquire(&dcb->delayqlock); + + if (dcb->delayq == NULL) + { + spinlock_release(&dcb->delayqlock); + rc = 1; + } + else + { + localq = dcb->delayq; + dcb->delayq = NULL; + spinlock_release(&dcb->delayqlock); + + if (MYSQL_IS_CHANGE_USER(((uint8_t *)GWBUF_DATA(localq)))) + { + MYSQL_session* mses; + GWBUF* new_packet; + + mses = (MYSQL_session *)dcb->session->client->data; + new_packet = gw_create_change_user_packet( + mses, + (MySQLProtocol *)dcb->protocol); + /** + * Remove previous packet which lacks scramble + * and append the new. + */ + localq = gwbuf_consume(localq, GWBUF_LENGTH(localq)); + localq = gwbuf_append(localq, new_packet); + } + rc = dcb_write(dcb, localq); + } + + if (rc == 0) + { + GWBUF* errbuf; + bool succp; + ROUTER_OBJECT *router = NULL; + ROUTER *router_instance = NULL; + void *rsession = NULL; + SESSION *session = dcb->session; + + CHK_SESSION(session); + + if (session != NULL) + { + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend write delayqueue error handling."))); +#endif + errbuf = mysql_create_custom_error( + 1, + 0, + "Failed to write buffered data to back-end server. " + "Buffer was empty or back-end was disconnected during " + "operation. Attempting to find a new backend."); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + gwbuf_free(errbuf); + + if (!succp) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + ss_dassert(dcb->dcb_errhandle_called); + dcb_close(dcb); + } + } + } + return rc; +} + +/** + * This routine handles the COM_CHANGE_USER command + * + * @param dcb The current backend DCB + * @param server The backend server pointer + * @param in_session The current session data (MYSQL_session) + * @param queue The GWBUF containing the COM_CHANGE_USER receveid + * @return 1 on success and 0 on failure + */ +static int gw_change_user( + DCB *backend, + SERVER *server, + SESSION *in_session, + GWBUF *queue) +{ + MYSQL_session *current_session = NULL; + MySQLProtocol *backend_protocol = NULL; + MySQLProtocol *client_protocol = NULL; + char username[MYSQL_USER_MAXLEN+1]=""; + char database[MYSQL_DATABASE_MAXLEN+1]=""; + char current_database[MYSQL_DATABASE_MAXLEN+1]=""; + uint8_t client_sha1[MYSQL_SCRAMBLE_LEN]=""; + uint8_t *client_auth_packet = GWBUF_DATA(queue); + unsigned int auth_token_len = 0; + uint8_t *auth_token = NULL; + int rv = -1; + int auth_ret = 1; + + current_session = (MYSQL_session *)in_session->client->data; + backend_protocol = backend->protocol; + client_protocol = in_session->client->protocol; + + /* now get the user, after 4 bytes header and 1 byte command */ + client_auth_packet += 5; + strncpy(username, (char *)client_auth_packet,MYSQL_USER_MAXLEN); + client_auth_packet += strlen(username) + 1; + + /* get the auth token len */ + memcpy(&auth_token_len, client_auth_packet, 1); + + client_auth_packet++; + + /* allocate memory for token only if auth_token_len > 0 */ + if (auth_token_len > 0) { + auth_token = (uint8_t *)malloc(auth_token_len); + ss_dassert(auth_token != NULL); + + if (auth_token == NULL) + return rv; + memcpy(auth_token, client_auth_packet, auth_token_len); + client_auth_packet += auth_token_len; + } + + /* get new database name */ + strncpy(database, (char *)client_auth_packet,MYSQL_DATABASE_MAXLEN); + + /* get character set */ + if (strlen(database)) { + client_auth_packet += strlen(database) + 1; + } else { + client_auth_packet++; + } + + if (client_auth_packet && *client_auth_packet) + memcpy(&backend_protocol->charset, client_auth_packet, sizeof(int)); + + /* save current_database name */ + strncpy(current_database, current_session->db,MYSQL_DATABASE_MAXLEN); + + /* + * Now clear database name in dcb as we don't do local authentication on db name for change user. + * Local authentication only for user@host and if successful the database name change is sent to backend. + */ + strcpy(current_session->db, ""); + + /* + * decode the token and check the password. + * Note: if auth_token_len == 0 && auth_token == NULL, user is without password + */ + auth_ret = gw_check_mysql_scramble_data(backend->session->client, + auth_token, + auth_token_len, + client_protocol->scramble, + sizeof(client_protocol->scramble), + username, + client_sha1); + + if (auth_ret != 0) { + if (!service_refresh_users(backend->session->client->service)) { + /* Try authentication again with new repository data */ + /* Note: if no auth client authentication will fail */ + auth_ret = gw_check_mysql_scramble_data( + backend->session->client, + auth_token, auth_token_len, + client_protocol->scramble, + sizeof(client_protocol->scramble), + username, + client_sha1); + } + } + + /* copy back current datbase to client session */ + strcpy(current_session->db, current_database); + + /* let's free the auth_token now */ + if (auth_token) + free(auth_token); + + if (auth_ret != 0) { + char *password_set = NULL; + char *message = NULL; + GWBUF* buf; + + if (auth_token_len > 0) + password_set = (char *)client_sha1; + else + password_set = ""; + + /** + * Create an error message and make it look like legit reply + * from backend server. Then make it look like an incoming event + * so that thread gets new task of it, calls clientReply + * which filters out duplicate errors from same cause and forward + * reply to the client. + */ + message = create_auth_fail_str(username, + backend->session->client->remote, + password_set, + ""); + if (message == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Creating error message failed."))); + rv = 0; + goto retblock; + } + /** + * Add command to backend's protocol, create artificial reply + * packet and add it to client's read buffer. + */ + protocol_add_srv_command((MySQLProtocol*)backend->protocol, + MYSQL_COM_CHANGE_USER); + modutil_reply_auth_error(backend, message, 0); + rv = 1; + } else { + rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol); + /* + * Now copy new data into user session + */ + strcpy(current_session->user, username); + strcpy(current_session->db, database); + memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); + } + +retblock: + gwbuf_free(queue); + + return rv; +} + + +/** + * Move packets or parts of packets from readbuf to outbuf as the packet headers + * and lengths have been noticed and counted. + * Session commands need to be marked so that they can be handled properly in + * the router's clientReply. + * + * @param dcb Backend's DCB where data was read from + * @param readbuf GWBUF where data was read to + * @param nbytes_to_process Number of bytes that has been read and need to be processed + * + * @return GWBUF which includes complete MySQL packet + */ +static GWBUF* process_response_data ( + DCB* dcb, + GWBUF* readbuf, + int nbytes_to_process) +{ + int npackets_left = 0; /*< response's packet count */ + ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */ + MySQLProtocol* p; + GWBUF* outbuf = NULL; + + /** Get command which was stored in gw_MySQLWrite_backend */ + p = DCB_PROTOCOL(dcb, MySQLProtocol); + if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p); + + /** All buffers processed here are sescmd responses */ + gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); + + /** + * Now it is known how many packets there should be and how much + * is read earlier. + */ + while (nbytes_to_process != 0) + { + mysql_server_cmd_t srvcmd; + bool succp; + + srvcmd = protocol_get_srv_command(p, false); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [process_response_data] Read command %s for DCB %p fd %d.", + pthread_self(), + STRPACKETTYPE(srvcmd), + dcb, + dcb->fd))); + /** + * Read values from protocol structure, fails if values are + * uninitialized. + */ + if (npackets_left == 0) + { + succp = protocol_get_response_status(p, &npackets_left, &nbytes_left); + + if (!succp || npackets_left == 0) + { + /** + * Examine command type and the readbuf. Conclude response + * packet count from the command type or from the first + * packet content. Fails if read buffer doesn't include + * enough data to read the packet length. + */ + init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); + } + } + /** Only session commands with responses should be processed */ + ss_dassert(npackets_left > 0); + + /** Read incomplete packet. */ + if (nbytes_left > nbytes_to_process) + { + /** Includes length info so it can be processed */ + if (nbytes_to_process >= 5) + { + /** discard source buffer */ + readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); + nbytes_left -= nbytes_to_process; + } + nbytes_to_process = 0; + } + /** Packet was read. All bytes belonged to the last packet. */ + else if (nbytes_left == nbytes_to_process) + { + nbytes_left = 0; + nbytes_to_process = 0; + ss_dassert(npackets_left > 0); + npackets_left -= 1; + outbuf = gwbuf_append(outbuf, readbuf); + readbuf = NULL; + } + /** + * Packet was read. There should be more since bytes were + * left over. + * Move the next packet to its own buffer and add that next + * to the prev packet's buffer. + */ + else /*< nbytes_left < nbytes_to_process */ + { + ss_dassert(nbytes_left >= 0); + nbytes_to_process -= nbytes_left; + + /** Move the prefix of the buffer to outbuf from redbuf */ + outbuf = gwbuf_append(outbuf, + gwbuf_clone_portion(readbuf, 0, (size_t)nbytes_left)); + readbuf = gwbuf_consume(readbuf, (size_t)nbytes_left); + ss_dassert(npackets_left > 0); + npackets_left -= 1; + nbytes_left = 0; + } + + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + + /** A complete packet was read */ + if (nbytes_left == 0) + { + /** No more packets in this response */ + if (npackets_left == 0 && outbuf != NULL) + { + GWBUF* b = outbuf; + + while (b->next != NULL) + { + b = b->next; + } + /** Mark last as end of response */ + gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END); + + /** Archive the command */ + protocol_archive_srv_command(p); + } + /** Read next packet */ + else + { + uint8_t* data; + + /** Read next packet length */ + data = GWBUF_DATA(readbuf); + nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN; + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + } + } + } + return outbuf; +} + + +static bool sescmd_response_complete( + DCB* dcb) +{ + int npackets_left; + ssize_t nbytes_left; + MySQLProtocol* p; + bool succp; + + p = DCB_PROTOCOL(dcb, MySQLProtocol); + if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(p); + + protocol_get_response_status(p, &npackets_left, &nbytes_left); + + if (npackets_left == 0) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} diff --git a/server/modules/protocol/mongo_client.c b/server/modules/protocol/mongo_client.c new file mode 100644 index 000000000..1de7d237d --- /dev/null +++ b/server/modules/protocol/mongo_client.c @@ -0,0 +1,1004 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2015 + */ + +/** + * @file mongo_client.c + * + * Revision History + * Date Who Description + + * + */ +#include +#include +#include +#include +#include +#include + +#include + +MODULE_INFO info = { + MODULE_API_PROTOCOL, + MODULE_GA, + GWPROTOCOL_VERSION, + "The client to plain protocol implementation" +}; + +/** Defined in log_manager.cc */ +extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; + +static char *version_str = "V1.0.0"; + +static int plain_accept(DCB *listener); +static int plain_listen(DCB *listener, char *config_bind); +static int plain_read(DCB* dcb); +static int plain_write_ready(DCB *dcb); +static int plain_write(DCB *dcb, GWBUF *queue); +static int plain_client_error(DCB *dcb); +static int plain_client_close(DCB *dcb); +static int plain_client_hangup_event(DCB *dcb); + +/* + * The "module object" for the mysqld client protocol module. + */ +static GWPROTOCOL MyObject = { + plain_read, /* Read - EPOLLIN handler */ + plain_write, /* Write - data from gateway */ + plain_write_ready, /* WriteReady - EPOLLOUT handler */ + plain_client_error, /* Error - EPOLLERR handler */ + plain_client_hangup_event, /* HangUp - EPOLLHUP handler */ + plain_accept, /* Accept */ + NULL, /* Connect */ + plain_client_close, /* Close */ + plain_listen, /* Listen */ + NULL, /* Authentication */ + NULL /* Session */ +}; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +GWPROTOCOL * +GetModuleObject() +{ + return &MyObject; +} + +/** + * mysql_send_ok + * + * Send a MySQL protocol OK message to the dcb (client) + * + * @param dcb Descriptor Control Block for the connection to which the OK is sent + * @param packet_number + * @param in_affected_rows + * @param mysql_message + * @return packet length + * + */ +int +mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { + return 1; +} + +/** + * MySQLSendHandshake + * + * @param dcb The descriptor control block to use for sending the handshake request + * @return The packet length sent + */ +int +MySQLSendHandshake(DCB* dcb) +{ + return 1; +} + +/** + * gw_mysql_do_authentication + * + * Performs the MySQL protocol 4.1 authentication, using data in GWBUF *queue + * + * (MYSQL_session*)client_data including: user, db, client_sha1 are copied into + * the dcb->data and later to dcb->session->data. + * + * client_capabilitiesa are copied into the dcb->protocol + * + * @param dcb Descriptor Control Block of the client + * @param queue The GWBUF with data from client + * @return 0 If succeed, otherwise non-zero value + * + * @note in case of failure, dcb->data is freed before returning. If succeed, + * dcb->data is freed in session.c:session_free. + */ +static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { + MySQLProtocol *protocol = NULL; + /* int compress = -1; */ + int connect_with_db = -1; + uint8_t *client_auth_packet = GWBUF_DATA(queue); + int client_auth_packet_size = 0; + char *username = NULL; + char *database = NULL; + unsigned int auth_token_len = 0; + uint8_t *auth_token = NULL; + uint8_t *stage1_hash = NULL; + int auth_ret = -1; + MYSQL_session *client_data = NULL; + + CHK_DCB(dcb); + + protocol = DCB_PROTOCOL(dcb, MySQLProtocol); + CHK_PROTOCOL(protocol); + client_data = (MYSQL_session *)calloc(1, sizeof(MYSQL_session)); +#if defined(SS_DEBUG) + client_data->myses_chk_top = CHK_NUM_MYSQLSES; + client_data->myses_chk_tail = CHK_NUM_MYSQLSES; +#endif + /** + * Assign authentication structure with client DCB. + */ + dcb->data = client_data; + + stage1_hash = client_data->client_sha1; + username = client_data->user; + + client_auth_packet_size = gwbuf_length(queue); + + /* For clients supporting CLIENT_PROTOCOL_41 + * the Handshake Response Packet is: + * + * 4 bytes mysql protocol heade + * 4 bytes capability flags + * 4 max-packet size + * 1 byte character set + * string[23] reserved (all [0]) + * ... + * ... + */ + + /* Detect now if there are enough bytes to continue */ + if (client_auth_packet_size < (4 + 4 + 4 + 1 + 23)) + { + return 1; + } + + memcpy(&protocol->client_capabilities, client_auth_packet + 4, 4); + + connect_with_db = + GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB & gw_mysql_get_byte4( + (uint32_t *)&protocol->client_capabilities); + /* + compress = + GW_MYSQL_CAPABILITIES_COMPRESS & gw_mysql_get_byte4( + &protocol->client_capabilities); + */ + + username = get_username_from_auth(username, client_auth_packet); + + if (username == NULL) + { + return 1; + } + + /* get charset */ + memcpy(&protocol->charset, client_auth_packet + 4 + 4 + 4, sizeof (int)); + + /* get the auth token len */ + memcpy(&auth_token_len, + client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) + 1, + 1); + + /* + * Note: some clients may pass empty database, connect_with_db !=0 but database ="" + */ + if (connect_with_db) { + database = client_data->db; + strncpy(database, + (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) + + 1 + 1 + auth_token_len), MYSQL_DATABASE_MAXLEN); + } + + /* allocate memory for token only if auth_token_len > 0 */ + if (auth_token_len) { + auth_token = (uint8_t *)malloc(auth_token_len); + memcpy(auth_token, + client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) + 1 + 1, + auth_token_len); + } + + /* + * Decode the token and check the password + * Note: if auth_token_len == 0 && auth_token == NULL, user is without password + */ + + auth_ret = gw_check_mysql_scramble_data(dcb, + auth_token, + auth_token_len, + protocol->scramble, + sizeof(protocol->scramble), + username, + stage1_hash); + + /* check for database name match in resource hashtable */ + auth_ret = check_db_name_after_auth(dcb, database, auth_ret); + + /* On failed auth try to load users' table from backend database */ + if (auth_ret != 0) { + if (!service_refresh_users(dcb->service)) { + /* Try authentication again with new repository data */ + /* Note: if no auth client authentication will fail */ + auth_ret = gw_check_mysql_scramble_data( + dcb, + auth_token, + auth_token_len, + protocol->scramble, + sizeof(protocol->scramble), + username, + stage1_hash); + } + else + { + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "%s: login attempt for user %s, user not " + "found.", + dcb->service->name, username))); + } + } + + /* Do again the database check */ + auth_ret = check_db_name_after_auth(dcb, database, auth_ret); + + /* on succesful auth set user into dcb field */ + if (auth_ret == 0) { + dcb->user = strdup(client_data->user); + } + + /* let's free the auth_token now */ + if (auth_token) { + free(auth_token); + } + + return auth_ret; +} + +/** + * Write function for client DCB: writes data from MaxScale to Client + * + * @param dcb The DCB of the client + * @param queue Queue of buffers to write + */ +int +plain_write(DCB *dcb, GWBUF *queue) +{ + return dcb_write(dcb, queue); +} + +/** + * Client read event triggered by EPOLLIN + * + * @param dcb Descriptor control block + * @return 0 if succeed, 1 otherwise + */ +int plain_read( + DCB* dcb) +{ + SESSION *session = NULL; + ROUTER_OBJECT *router = NULL; + ROUTER *router_instance = NULL; + void *rsession = NULL; + MySQLProtocol *protocol = NULL; + GWBUF *read_buffer = NULL; + int rc = 0; + int nbytes_read = 0; + uint8_t cap = 0; + bool stmt_input = false; /*< router input type */ + + CHK_DCB(dcb); + protocol = DCB_PROTOCOL(dcb, MySQLProtocol); + CHK_PROTOCOL(protocol); + rc = dcb_read(dcb, &read_buffer); + + if (rc < 0) + { + dcb_close(dcb); + } + nbytes_read = gwbuf_length(read_buffer); + + if (nbytes_read == 0) + { + goto return_rc; + } + + if(dcb->session == NULL) + { + dcb->session = session_alloc(dcb->service,dcb); + } + + rc = SESSION_ROUTE_QUERY(dcb->session, read_buffer); + + +return_rc: + + return rc; +} + +/////////////////////////////////////////////// +// client write event to Client triggered by EPOLLOUT +////////////////////////////////////////////// +/** + * @node Client's fd became writable, and EPOLLOUT event + * arrived. As a consequence, client input buffer (writeq) is flushed. + * + * Parameters: + * @param dcb - in, use + * client dcb + * + * @return constantly 1 + * + * + * @details (write detailed description here) + * + */ +int plain_write_ready(DCB *dcb) +{ + MySQLProtocol *protocol = NULL; + + CHK_DCB(dcb); + + ss_dassert(dcb->state != DCB_STATE_DISCONNECTED); + + if (dcb == NULL) { + goto return_1; + } + + if (dcb->state == DCB_STATE_DISCONNECTED) { + goto return_1; + } + + if (dcb->protocol == NULL) { + goto return_1; + } + protocol = (MySQLProtocol *)dcb->protocol; + + + dcb_drain_writeq(dcb); + goto return_1; + +return_1: + + return 1; +} + +/** + * set listener for mysql protocol, retur 1 on success and 0 in failure + */ +int plain_listen( + DCB *listen_dcb, + char *config_bind) +{ + int l_so; + int syseno = 0; + struct sockaddr_in serv_addr; + struct sockaddr_un local_addr; + struct sockaddr *current_addr; + int one = 1; + int rc; + + if (strchr(config_bind, '/')) { + char *tmp = strrchr(config_bind, ':'); + if (tmp) + *tmp = '\0'; + + // UNIX socket create + if ((l_so = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, + "\n* Error: can't create UNIX socket due " + "error %i, %s.\n\n\t", + errno, + strerror(errno)); + return 0; + } + memset(&local_addr, 0, sizeof(local_addr)); + local_addr.sun_family = AF_UNIX; + strncpy(local_addr.sun_path, config_bind, sizeof(local_addr.sun_path) - 1); + + current_addr = (struct sockaddr *) &local_addr; + + } else { + /* MaxScale, as default, will bind on port 4406 */ + if (!parse_bindconfig(config_bind, 4406, &serv_addr)) { + fprintf(stderr, "Error in parse_bindconfig for [%s]\n", config_bind); + return 0; + } + // TCP socket create + if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + fprintf(stderr, + "\n* Error: can't create socket due " + "error %i, %s.\n\n\t", + errno, + strerror(errno)); + return 0; + } + + current_addr = (struct sockaddr *) &serv_addr; + } + + listen_dcb->fd = -1; + + // socket options + if((syseno = setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one))) != 0){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set socket options. Error %d: %s",errno,strerror(errno)))); + } + + + // set NONBLOCKING mode + setnonblocking(l_so); + + /* get the right socket family for bind */ + switch (current_addr->sa_family) { + case AF_UNIX: + rc = unlink(config_bind); + if ( (rc == -1) && (errno!=ENOENT) ) { + fprintf(stderr, "Error unlink Unix Socket %s\n", config_bind); + } + + if (bind(l_so, (struct sockaddr *) &local_addr, sizeof(local_addr)) < 0) { + fprintf(stderr, + "\n* Bind failed due error %i, %s.\n", + errno, + strerror(errno)); + fprintf(stderr, "* Can't bind to %s\n\n", config_bind); + close(l_so); + return 0; + } + + /* set permission for all users */ + if (chmod(config_bind, 0777) < 0) { + fprintf(stderr, + "\n* chmod failed for %s due error %i, %s.\n\n", + config_bind, + errno, + strerror(errno)); + } + + break; + + case AF_INET: + if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + fprintf(stderr, + "\n* Bind failed due error %i, %s.\n", + errno, + strerror(errno)); + fprintf(stderr, "* Can't bind to %s\n\n", config_bind); + close(l_so); + return 0; + } + break; + + default: + fprintf(stderr, "* Socket Family %i not supported\n", current_addr->sa_family); + close(l_so); + return 0; + } + + rc = listen(l_so, 10 * SOMAXCONN); + + if (rc == 0) { + LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,"Listening MySQL connections at %s", config_bind))); + } else { + int eno = errno; + errno = 0; + fprintf(stderr, + "\n* Failed to start listening MySQL due error %d, %s\n\n", + eno, + strerror(eno)); + close(l_so); + return 0; + } + // assign l_so to dcb + listen_dcb->fd = l_so; + + // add listening socket to poll structure + if (poll_add_dcb(listen_dcb) == -1) { + fprintf(stderr, + "\n* Failed to start polling the socket due error " + "%i, %s.\n\n", + errno, + strerror(errno)); + return 0; + } +#if defined(FAKE_CODE) + conn_open[l_so] = true; +#endif /* FAKE_CODE */ + listen_dcb->func.accept = plain_accept; + + return 1; +} + + +/** + * @node (write brief function description here) + * + * Parameters: + * @param listener - + * + * + * @return 0 in success, 1 in failure + * + * + * @details (write detailed description here) + * + */ +int plain_accept(DCB *listener) +{ + int rc = 0; + DCB *client_dcb; + MySQLProtocol *protocol; + int c_sock; + struct sockaddr client_conn; + socklen_t client_len = sizeof(struct sockaddr_storage); + int sendbuf = GW_BACKEND_SO_SNDBUF; + socklen_t optlen = sizeof(sendbuf); + int eno = 0; + int syseno = 0; + int i = 0; + + CHK_DCB(listener); + + while (1) { + + retry_accept: + +#if defined(FAKE_CODE) + if (fail_next_accept > 0) + { + c_sock = -1; + eno = fail_accept_errno; + fail_next_accept -= 1; + } else { + fail_accept_errno = 0; +#endif /* FAKE_CODE */ + // new connection from client + c_sock = accept(listener->fd, + (struct sockaddr *) &client_conn, + &client_len); + eno = errno; + errno = 0; +#if defined(FAKE_CODE) + } +#endif /* FAKE_CODE */ + + if (c_sock == -1) { + + if (eno == EAGAIN || eno == EWOULDBLOCK) + { + /** + * We have processed all incoming connections. + */ + rc = 1; + goto return_rc; + } + else if (eno == ENFILE || eno == EMFILE) + { + struct timespec ts1; + ts1.tv_sec = 0; + /** + * Exceeded system's (ENFILE) or processes + * (EMFILE) max. number of files limit. + */ + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [plain_accept] Error %d, %s. ", + pthread_self(), + eno, + strerror(eno)))); + + if (i == 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error %d, %s. " + "Failed to accept new client " + "connection.", + eno, + strerror(eno)))); + } + i++; + ts1.tv_nsec = 100*i*i*1000000; + nanosleep(&ts1, NULL); + + if (i<10) { + goto retry_accept; + } + rc = 1; + goto return_rc; + } + else + { + /** + * Other error. + */ + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [plain_accept] Error %d, %s.", + pthread_self(), + eno, + strerror(eno)))); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to accept new client " + "connection due to %d, %s.", + eno, + strerror(eno)))); + rc = 1; + goto return_rc; + } /* if (eno == ..) */ + } /* if (c_sock == -1) */ + /* reset counter */ + i = 0; + + listener->stats.n_accepts++; +#if defined(SS_DEBUG) + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [plain_accept] Accepted fd %d.", + pthread_self(), + c_sock))); +#endif /* SS_DEBUG */ +#if defined(FAKE_CODE) + conn_open[c_sock] = true; +#endif /* FAKE_CODE */ + /* set nonblocking */ + sendbuf = GW_CLIENT_SO_SNDBUF; + + if((syseno = setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen)) != 0){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set socket options. Error %d: %s",errno,strerror(errno)))); + } + + sendbuf = GW_CLIENT_SO_RCVBUF; + + if((syseno = setsockopt(c_sock, SOL_SOCKET, SO_RCVBUF, &sendbuf, optlen)) != 0){ + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,"Error: Failed to set socket options. Error %d: %s",errno,strerror(errno)))); + } + setnonblocking(c_sock); + + client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER); + + if (client_dcb == NULL) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to create " + "DCB object for client connection."))); + close(c_sock); + rc = 1; + goto return_rc; + } + + client_dcb->service = listener->session->service; + client_dcb->fd = c_sock; + + // get client address + if ( client_conn.sa_family == AF_UNIX) + { + // client address + client_dcb->remote = strdup("localhost_from_socket"); + // set localhost IP for user authentication + (client_dcb->ipv4).sin_addr.s_addr = 0x0100007F; + } + else + { + /* client IPv4 in raw data*/ + memcpy(&client_dcb->ipv4, + (struct sockaddr_in *)&client_conn, + sizeof(struct sockaddr_in)); + /* client IPv4 in string representation */ + client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); + + if (client_dcb->remote != NULL) + { + inet_ntop(AF_INET, + &(client_dcb->ipv4).sin_addr, + client_dcb->remote, + INET_ADDRSTRLEN); + } + } + protocol = mysql_protocol_init(client_dcb, c_sock); + ss_dassert(protocol != NULL); + + if (protocol == NULL) { + /** delete client_dcb */ + dcb_close(client_dcb); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [plain_accept] Failed to create " + "protocol object for client connection.", + pthread_self()))); + rc = 1; + goto return_rc; + } + client_dcb->protocol = protocol; + // assign function poiters to "func" field + memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL)); + + // client protocol state change + protocol->protocol_auth_state = MYSQL_IDLE; + + /** + * Set new descriptor to event set. At the same time, + * change state to DCB_STATE_POLLING so that + * thread which wakes up sees correct state. + */ + if (poll_add_dcb(client_dcb) == -1) + { + /* Send a custom error as MySQL command reply */ + mysql_send_custom_error( + client_dcb, + 1, + 0, + "MaxScale internal error."); + + /** close client_dcb */ + dcb_close(client_dcb); + + /** Previous state is recovered in poll_add_dcb. */ + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [plain_accept] Failed to add dcb %p for " + "fd %d to epoll set.", + pthread_self(), + client_dcb, + client_dcb->fd))); + rc = 1; + goto return_rc; + } + else + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [plain_accept] Added dcb %p for fd " + "%d to epoll set.", + pthread_self(), + client_dcb, + client_dcb->fd))); + } + } /**< while 1 */ +#if defined(SS_DEBUG) + if (rc == 0) { + CHK_DCB(client_dcb); + CHK_PROTOCOL(((MySQLProtocol *)client_dcb->protocol)); + } +#endif +return_rc: + + return rc; +} + +static int plain_client_error( + DCB* dcb) +{ + SESSION* session; + + CHK_DCB(dcb); + + session = dcb->session; + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [plain_client_error] Error event handling for DCB %p " + "in state %s, session %p.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + (session != NULL ? session : NULL)))); + + if (session != NULL && session->state == SESSION_STATE_STOPPING) + { + goto retblock; + } + + dcb_close(dcb); + +retblock: + return 1; +} + +static int +plain_client_close(DCB *dcb) +{ + SESSION* session; + ROUTER_OBJECT* router; + void* router_instance; +#if defined(SS_DEBUG) + MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; + if (dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + if (!DCB_IS_CLONE(dcb)) CHK_PROTOCOL(protocol); + } +#endif + LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, + "%lu [plain_client_close]", + pthread_self()))); + mysql_protocol_done(dcb); + session = dcb->session; + /** + * session may be NULL if session_alloc failed. + * In that case, router session wasn't created. + */ + if (session != NULL) + { + CHK_SESSION(session); + spinlock_acquire(&session->ses_lock); + + if (session->state != SESSION_STATE_STOPPING) + { + session->state = SESSION_STATE_STOPPING; + } + router_instance = session->service->router_instance; + router = session->service->router; + /** + * If router session is being created concurrently router + * session might be NULL and it shouldn't be closed. + */ + if (session->router_session != NULL) + { + spinlock_release(&session->ses_lock); + /** Close router session and all its connections */ + router->closeSession(router_instance, session->router_session); + } + else + { + spinlock_release(&session->ses_lock); + } + } + return 1; +} + +/** + * Handle a hangup event on the client side descriptor. + * + * We simply close the DCB, this will propogate the closure to any + * backend descriptors and perform the session cleanup. + * + * @param dcb The DCB of the connection + */ +static int +plain_client_hangup_event(DCB *dcb) +{ + SESSION* session; + + CHK_DCB(dcb); + session = dcb->session; + + if (session != NULL && session->state == SESSION_STATE_ROUTER_READY) + { + CHK_SESSION(session); + } + + if (session != NULL && session->state == SESSION_STATE_STOPPING) + { + goto retblock; + } + + dcb_close(dcb); + +retblock: + return 1; +} + + +/** + * Detect if buffer includes partial mysql packet or multiple packets. + * Store partial packet to dcb_readqueue. Send complete packets one by one + * to router. + * + * It is assumed readbuf includes at least one complete packet. + * Return 1 in success. If the last packet is incomplete return success but + * leave incomplete packet to readbuf. + * + * @param session Session pointer + * @param p_readbuf Pointer to the address of GWBUF including the query + * + * @return 1 if succeed, + */ +static int route_by_statement( + SESSION* session, + GWBUF** p_readbuf) +{ + int rc; + GWBUF* packetbuf; +#if defined(SS_DEBUG) + GWBUF* tmpbuf; + + tmpbuf = *p_readbuf; + while (tmpbuf != NULL) + { + ss_dassert(GWBUF_IS_TYPE_MYSQL(tmpbuf)); + tmpbuf=tmpbuf->next; + } +#endif + do + { + ss_dassert(GWBUF_IS_TYPE_MYSQL((*p_readbuf))); + + /** + * Collect incoming bytes to a buffer until complete packet has + * arrived and then return the buffer. + */ + packetbuf = gw_MySQL_get_next_packet(p_readbuf); + + if (packetbuf != NULL) + { + CHK_GWBUF(packetbuf); + ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf)); + /** + * This means that buffer includes exactly one MySQL + * statement. + * backend func.write uses the information. MySQL backend + * protocol, for example, stores the command identifier + * to protocol structure. When some other thread reads + * the corresponding response the command tells how to + * handle response. + * + * Set it here instead of plain_read to make + * sure it is set to each (MySQL) packet. + */ + gwbuf_set_type(packetbuf, GWBUF_TYPE_SINGLE_STMT); + /** Route query */ + rc = SESSION_ROUTE_QUERY(session, packetbuf); + } + else + { + rc = 1; + goto return_rc; + } + } + while (rc == 1 && *p_readbuf != NULL); + +return_rc: + return rc; +} + diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 4cf341d83..a14ca00ad 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -9,6 +9,10 @@ add_library(readconnroute SHARED readconnroute.c) target_link_libraries(readconnroute log_manager utils) install(TARGETS readconnroute DESTINATION modules) +add_library(plainroute SHARED plainroute.c) +target_link_libraries(plainroute log_manager utils) +install(TARGETS plainroute DESTINATION modules) + add_library(debugcli SHARED debugcli.c debugcmd.c) target_link_libraries(debugcli log_manager utils) install(TARGETS debugcli DESTINATION modules) diff --git a/server/modules/routing/plainroute.c b/server/modules/routing/plainroute.c new file mode 100644 index 000000000..245a3427a --- /dev/null +++ b/server/modules/routing/plainroute.c @@ -0,0 +1,923 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2014 + */ + +/** + * @file readconnroute.c - Read Connection Load Balancing Query Router + * + * This is the implementation of a simple query router that balances + * read connections. It assumes the service is configured with a set + * of slaves and that the application clients already split read and write + * queries. It offers a service to balance the client read connections + * over this set of slave servers. It does this once only, at the time + * the connection is made. It chooses the server that currently has the least + * number of connections by keeping a count for each server of how + * many connections the query router has made to the server. + * + * When two servers have the same number of current connections the one with + * the least number of connections since startup will be used. + * + * The router may also have options associated to it that will limit the + * choice of backend server. Currently two options are supported, the "master" + * option will cause the router to only connect to servers marked as masters + * and the "slave" option will limit connections to routers that are marked + * as slaves. If neither option is specified the router will connect to either + * masters or slaves. + * + * @verbatim + * Revision History + * + * Date Who Description + * 14/06/2013 Mark Riddoch Initial implementation + * 25/06/2013 Mark Riddoch Addition of checks for current server state + * 26/06/2013 Mark Riddoch Use server with least connections since + * startup if the number of current + * connections is the same for two servers + * Addition of master and slave options + * 27/06/2013 Vilho Raatikka Added skygw_log_write command as an example + * and necessary headers. + * 17/07/2013 Massimiliano Pinto Added clientReply routine: + * called by backend server to send data to client + * Included mysql_client_server_protocol.h + * with macros and MySQL commands with MYSQL_ prefix + * avoiding any conflict with the standard ones + * in mysql.h + * 22/07/2013 Mark Riddoch Addition of joined router option for Galera + * clusters + * 31/07/2013 Massimiliano Pinto Added a check for candidate server, if NULL return + * 12/08/2013 Mark Riddoch Log unsupported router options + * 04/09/2013 Massimiliano Pinto Added client NULL check in clientReply + * 22/10/2013 Massimiliano Pinto errorReply called from backend, for client error reply + * or take different actions such as open a new backend connection + * 20/02/2014 Massimiliano Pinto If router_options=slave, route traffic to master if no slaves available + * 06/03/2014 Massimiliano Pinto Server connection counter is now updated in closeSession + * 24/06/2014 Massimiliano Pinto New rules for selecting the Master server + * 27/06/2014 Mark Riddoch Addition of server weighting + * + * @endverbatim + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +/** Defined in log_manager.cc */ +extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; + +MODULE_INFO info = { + MODULE_API_ROUTER, + MODULE_GA, + ROUTER_VERSION, + "A connection based router to load balance based on connections" +}; + +static char *version_str = "V1.1.0"; + +/* The router entry points */ +static ROUTER *createInstance(SERVICE *service, char **options); +static void *newSession(ROUTER *instance, SESSION *session); +static void closeSession(ROUTER *instance, void *router_session); +static void freeSession(ROUTER *instance, void *router_session); +static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); +static void diagnostics(ROUTER *instance, DCB *dcb); +static void clientReply( + ROUTER *instance, + void *router_session, + GWBUF *queue, + DCB *backend_dcb); +static void handleError( + ROUTER *instance, + void *router_session, + GWBUF *errbuf, + DCB *backend_dcb, + error_action_t action, + bool *succp); +static uint8_t getCapabilities (ROUTER* inst, void* router_session); + + +/** The module object definition */ +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostics, + clientReply, + handleError, + getCapabilities +}; + +static bool rses_begin_locked_router_action( + ROUTER_CLIENT_SES* rses); + +static void rses_end_locked_router_action( + ROUTER_CLIENT_SES* rses); + +static BACKEND *get_root_master( + BACKEND **servers); +static int handle_state_switch( + DCB* dcb,DCB_REASON reason, void * routersession); +static SPINLOCK instlock; +static ROUTER_INSTANCE *instances; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "Initialise readconnroute router module %s.\n", version_str))); + spinlock_init(&instlock); + instances = NULL; +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +ROUTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the router for a particular service + * within the gateway. + * + * @param service The service this router is being create for + * @param options An array of options for this query router + * + * @return The instance data for this new instance + */ +static ROUTER * +createInstance(SERVICE *service, char **options) +{ +ROUTER_INSTANCE *inst; +SERVER *server; +SERVER_REF *sref; +int i, n; +BACKEND *backend; +char *weightby; + + if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { + return NULL; + } + + inst->service = service; + spinlock_init(&inst->lock); + + /* + * We need an array of the backend servers in the instance structure so + * that we can maintain a count of the number of connections to each + * backend server. + */ + for (sref = service->dbref, n = 0; sref; sref = sref->next) + n++; + + inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + if (!inst->servers) + { + free(inst); + return NULL; + } + + for (sref = service->dbref, n = 0; sref; sref = sref->next) + { + if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) + { + for (i = 0; i < n; i++) + free(inst->servers[i]); + free(inst->servers); + free(inst); + return NULL; + } + inst->servers[n]->server = sref->server; + inst->servers[n]->current_connection_count = 0; + inst->servers[n]->weight = 1000; + n++; + } + inst->servers[n] = NULL; + + if ((weightby = serviceGetWeightingParameter(service)) != NULL) + { + int total = 0; + for (n = 0; inst->servers[n]; n++) + { + backend = inst->servers[n]; + total += atoi(serverGetParameter(backend->server, + weightby)); + } + if (total == 0) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "WARNING: Weighting Parameter for service '%s' " + "will be ignored as no servers have values " + "for the parameter '%s'.\n", + service->name, weightby))); + } + else + { + for (n = 0; inst->servers[n]; n++) + { + int perc, wght; + backend = inst->servers[n]; + perc = ((wght = atoi(serverGetParameter(backend->server, + weightby))) * 1000) / total; + if (perc == 0 && wght != 0) + perc = 1; + backend->weight = perc; + if (perc == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Server '%s' has no value " + "for weighting parameter '%s', " + "no queries will be routed to " + "this server.\n", + inst->servers[n]->server->unique_name, + weightby))); + } + + } + } + } + + /* + * Process the options + */ + inst->bitmask = 0; + inst->bitvalue = 0; + if (options) + { + for (i = 0; options[i]; i++) + { + if (!strcasecmp(options[i], "master")) + { + inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); + inst->bitvalue |= SERVER_MASTER; + } + else if (!strcasecmp(options[i], "slave")) + { + inst->bitmask |= (SERVER_MASTER|SERVER_SLAVE); + inst->bitvalue |= SERVER_SLAVE; + } + else if (!strcasecmp(options[i], "synced")) + { + inst->bitmask |= (SERVER_JOINED); + inst->bitvalue |= SERVER_JOINED; + } + else if (!strcasecmp(options[i], "ndb")) + { + inst->bitmask |= (SERVER_NDB); + inst->bitvalue |= SERVER_NDB; + } + else + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced|ndb]", + options[i]))); + } + } + } + + /* + * We have completed the creation of the instance data, so now + * insert this router instance into the linked list of routers + * that have been created with this module. + */ + spinlock_acquire(&instlock); + inst->next = instances; + instances = inst; + spinlock_release(&instlock); + + return (ROUTER *)inst; +} + +/** + * Associate a new session with this instance of the router. + * + * @param instance The router instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(ROUTER *instance, SESSION *session) +{ +ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; +ROUTER_CLIENT_SES *client_rses; +BACKEND *candidate = NULL; +int i; + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [newSession] new router session with session " + "%p, and inst %p.", + pthread_self(), + session, + inst))); + + + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + + if (client_rses == NULL) { + return NULL; + } + +#if defined(SS_DEBUG) + client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; +#endif + + + /** + * Find a backend server to connect to. This is the extent of the + * load balancing algorithm we need to implement for this simple + * connection router. + */ + + /* + * Loop over all the servers and find any that have fewer connections + * than the candidate server. + * + * If a server has less connections than the current candidate we mark this + * as the new candidate to connect to. + * + * If a server has the same number of connections currently as the candidate + * and has had less connections over time than the candidate it will also + * become the new candidate. This has the effect of spreading the + * connections over different servers during periods of very low load. + */ + for (i = 0; inst->servers[i]; i++) { + + + if (SERVER_IN_MAINT(inst->servers[i]->server)) + continue; + + if (inst->servers[i]->weight == 0) + continue; + + + + /* If no candidate set, set first running server as + our initial candidate server */ + if(candidate == NULL) + { + candidate = inst->servers[i]; + } + else if((inst->servers[i]->current_connection_count + * 1000) / inst->servers[i]->weight < + (candidate->current_connection_count * + 1000) / candidate->weight) + { + /* This running server has fewer + connections, set it as a new candidate */ + candidate = inst->servers[i]; + } + else if((inst->servers[i]->current_connection_count + * 1000) / inst->servers[i]->weight == + (candidate->current_connection_count * + 1000) / candidate->weight && + inst->servers[i]->server->stats.n_connections < + candidate->server->stats.n_connections) + { + /* This running server has the same number + of connections currently as the candidate + but has had fewer connections over time + than candidate, set this server to candidate*/ + candidate = inst->servers[i]; + } + } + + + + client_rses->rses_capabilities = RCAP_TYPE_PACKET_INPUT; + + /* + * We now have the server with the least connections. + * Bump the connection count for this server + */ + atomic_add(&candidate->current_connection_count, 1); + client_rses->backend = candidate; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [newSession] Selected server in port %d. " + "Connections : %d\n", + pthread_self(), + candidate->server->port, + candidate->current_connection_count))); + /* + * Open a backend connection, putting the DCB for this + * connection in the client_rses->backend_dcb + */ + client_rses->backend_dcb = dcb_connect(candidate->server, + session, + candidate->server->protocol); + if (client_rses->backend_dcb == NULL) + { + atomic_add(&candidate->current_connection_count, -1); + free(client_rses); + return NULL; + } + dcb_add_callback( + client_rses->backend_dcb, + DCB_REASON_NOT_RESPONDING, + &handle_state_switch, + client_rses); + inst->stats.n_sessions++; + + /** + * Add this session to the list of active sessions. + */ + spinlock_acquire(&inst->lock); + client_rses->next = inst->connections; + inst->connections = client_rses; + spinlock_release(&inst->lock); + + + return (void *)client_rses; +} + +/** + * @node Unlink from backend server, unlink from router's connection list, + * and free memory of a router client session. + * + * Parameters: + * @param router - + * + * + * @param router_cli_ses - + * + * + * @return void + * + * + * @details (write detailed description here) + * + */ +static void freeSession( + ROUTER* router_instance, + void* router_client_ses) +{ + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance; + ROUTER_CLIENT_SES* router_cli_ses = + (ROUTER_CLIENT_SES *)router_client_ses; + int prev_val; + + prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1); + ss_dassert(prev_val > 0); + + spinlock_acquire(&router->lock); + + if (router->connections == router_cli_ses) { + router->connections = router_cli_ses->next; + } else { + ROUTER_CLIENT_SES *ptr = router->connections; + + while (ptr != NULL && ptr->next != router_cli_ses) { + ptr = ptr->next; + } + + if (ptr != NULL) { + ptr->next = router_cli_ses->next; + } + } + spinlock_release(&router->lock); + + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [freeSession] Unlinked router_client_session %p from " + "router %p and from server on port %d. Connections : %d. ", + pthread_self(), + router_cli_ses, + router, + router_cli_ses->backend->server->port, + prev_val-1))); + + free(router_cli_ses); +} + + +/** + * Close a session with the router, this is the mechanism + * by which a router may cleanup data structure etc. + * + * @param instance The router instance data + * @param router_session The session being closed + */ +static void +closeSession(ROUTER *instance, void *router_session) +{ +ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; +DCB* backend_dcb; + + CHK_CLIENT_RSES(router_cli_ses); + /** + * Lock router client session for secure read and update. + */ + if (rses_begin_locked_router_action(router_cli_ses)) + { + /* decrease server current connection counter */ + atomic_add(&router_cli_ses->backend->server->stats.n_current, -1); + + backend_dcb = router_cli_ses->backend_dcb; + router_cli_ses->backend_dcb = NULL; + router_cli_ses->rses_closed = true; + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); + + /** + * Close the backend server connection + */ + if (backend_dcb != NULL) { + CHK_DCB(backend_dcb); + dcb_close(backend_dcb); + } + } +} + +/** + * We have data from the client, we must route it to the backend. + * This is simply a case of sending it to the connection that was + * chosen when we started the client session. + * + * @param instance The router instance + * @param router_session The router session returned from the newSession call + * @param queue The queue of data buffers to route + * @return if succeed 1, otherwise 0 + */ +static int +routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) +{ + ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance; + ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session; + uint8_t *payload = GWBUF_DATA(queue); + int mysql_command; + int rc; + DCB* backend_dcb; + bool rses_is_closed; + + inst->stats.n_queries++; + + /** Dirty read for quick check if router is closed. */ + if (router_cli_ses->rses_closed) + { + rses_is_closed = true; + } + else + { + /** + * Lock router client session for secure read of DCBs + */ + rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); + } + + if (!rses_is_closed) + { + backend_dcb = router_cli_ses->backend_dcb; + /** unlock */ + rses_end_locked_router_action(router_cli_ses); + } + + if (rses_is_closed || backend_dcb == NULL) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Failed to route MySQL command %d to backend " + "server.", + mysql_command))); + rc = 0; + goto return_rc; + } + + + rc = backend_dcb->func.write(backend_dcb, queue); + +return_rc: + return rc; +} + +/** + * Display router diagnostics + * + * @param instance Instance of the router + * @param dcb DCB to send diagnostics to + */ +static void +diagnostics(ROUTER *router, DCB *dcb) +{ +ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; +ROUTER_CLIENT_SES *session; +int i = 0; +BACKEND *backend; +char *weightby; + + spinlock_acquire(&router_inst->lock); + session = router_inst->connections; + while (session) + { + i++; + session = session->next; + } + spinlock_release(&router_inst->lock); + + dcb_printf(dcb, "\tNumber of router sessions: %d\n", + router_inst->stats.n_sessions); + dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); + dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", + router_inst->stats.n_queries); + if ((weightby = serviceGetWeightingParameter(router_inst->service)) + != NULL) + { + dcb_printf(dcb, "\tConnection distribution based on %s " + "server parameter.\n", + weightby); + dcb_printf(dcb, + "\t\tServer Target %% Connections\n"); + for (i = 0; router_inst->servers[i]; i++) + { + backend = router_inst->servers[i]; + dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", + backend->server->unique_name, + (float)backend->weight / 10, + backend->current_connection_count); + } + + } +} + +/** + * Client Reply routine + * + * The routine will reply to client data from backend server + * + * @param instance The router instance + * @param router_session The router session + * @param backend_dcb The backend DCB + * @param queue The GWBUF with reply data + */ +static void +clientReply( + ROUTER *instance, + void *router_session, + GWBUF *queue, + DCB *backend_dcb) +{ + DCB *client ; + + client = backend_dcb->session->client; + + ss_dassert(client != NULL); + + SESSION_ROUTE_REPLY(backend_dcb->session, queue); +} + +/** + * Error Handler routine + * + * The routine will handle errors that occurred in backend writes. + * + * @param instance The router instance + * @param router_session The router session + * @param message The error message to reply + * @param backend_dcb The backend DCB + * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * + */ +static void handleError( + ROUTER *instance, + void *router_session, + GWBUF *errbuf, + DCB *backend_dcb, + error_action_t action, + bool *succp) + +{ + DCB *client_dcb; + SESSION *session = backend_dcb->session; + session_state_t sesstate; + + /** Reset error handle flag from a given DCB */ + if (action == ERRACT_RESET) + { + backend_dcb->dcb_errhandle_called = false; + return; + } + + /** Don't handle same error twice on same DCB */ + if (backend_dcb->dcb_errhandle_called) + { + /** we optimistically assume that previous call succeed */ + *succp = true; + return; + } + else + { + backend_dcb->dcb_errhandle_called = true; + } + spinlock_acquire(&session->ses_lock); + sesstate = session->state; + client_dcb = session->client; + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + spinlock_release(&session->ses_lock); + client_dcb->func.write(client_dcb, gwbuf_clone(errbuf)); + } + else + { + spinlock_release(&session->ses_lock); + } + + /** false because connection is not available anymore */ + *succp = false; +} + +/** to be inline'd */ +/** + * @node Acquires lock to router client session if it is not closed. + * + * Parameters: + * @param rses - in, use + * + * + * @return true if router session was not closed. If return value is true + * it means that router is locked, and must be unlocked later. False, if + * router was closed before lock was acquired. + * + * + * @details (write detailed description here) + * + */ +static bool rses_begin_locked_router_action( + ROUTER_CLIENT_SES* rses) +{ + bool succp = false; + + CHK_CLIENT_RSES(rses); + + if (rses->rses_closed) { + goto return_succp; + } + spinlock_acquire(&rses->rses_lock); + if (rses->rses_closed) { + spinlock_release(&rses->rses_lock); + goto return_succp; + } + succp = true; + +return_succp: + return succp; +} + +/** to be inline'd */ +/** + * @node Releases router client session lock. + * + * Parameters: + * @param rses - + * + * + * @return void + * + * + * @details (write detailed description here) + * + */ +static void rses_end_locked_router_action( + ROUTER_CLIENT_SES* rses) +{ + CHK_CLIENT_RSES(rses); + spinlock_release(&rses->rses_lock); +} + + +static uint8_t getCapabilities( + ROUTER* inst, + void* router_session) +{ + return 0; +} + +/******************************** + * This routine returns the root master server from MySQL replication tree + * Get the root Master rule: + * + * find server with the lowest replication depth level + * and the SERVER_MASTER bitval + * Servers are checked even if they are in 'maintenance' + * + * @param servers The list of servers + * @return The Master found + * + */ + +static BACKEND *get_root_master(BACKEND **servers) { + int i = 0; + BACKEND *master_host = NULL; + + for (i = 0; servers[i]; i++) { + if (servers[i] && (servers[i]->server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) { + if (master_host && servers[i]->server->depth < master_host->server->depth) { + master_host = servers[i]; + } else { + if (master_host == NULL) { + master_host = servers[i]; + } + } + } + } + return master_host; +} + +static int handle_state_switch(DCB* dcb,DCB_REASON reason, void * routersession) +{ + ss_dassert(dcb != NULL); + SESSION* session = dcb->session; + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES*)routersession; + SERVICE* service = session->service; + ROUTER* router = (ROUTER *)service->router; + + switch(reason) + { + case DCB_REASON_CLOSE: + dcb->func.close(dcb); + break; + case DCB_REASON_DRAINED: + /** Do we need to do anything? */ + break; + case DCB_REASON_HIGH_WATER: + /** Do we need to do anything? */ + break; + case DCB_REASON_LOW_WATER: + /** Do we need to do anything? */ + break; + case DCB_REASON_ERROR: + dcb->func.error(dcb); + break; + case DCB_REASON_HUP: + dcb->func.hangup(dcb); + break; + case DCB_REASON_NOT_RESPONDING: + dcb->func.hangup(dcb); + break; + default: + break; + } + + return 0; +}