From 7e6cb7afc2c3bcc229f39e97fe7d83147a5e40ef Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 6 Jun 2014 23:32:04 +0300 Subject: [PATCH] Snapshot of failure tolerance changes. Added a lot of logging to error, trace and message logs which should help the user to handle errors which can't be automatically resolved, like attempt to use nonexisting database. --- query_classifier/query_classifier.cc | 4 +- server/core/dbusers.c | 17 +- server/core/dcb.c | 29 ++- server/core/poll.c | 4 +- server/core/service.c | 6 + server/include/dcb.h | 3 + server/include/router.h | 8 +- server/include/server.h | 10 + server/include/service.h | 1 + .../include/mysql_client_server_protocol.h | 4 +- server/modules/include/readwritesplit.h | 7 + server/modules/monitor/mysql_mon.c | 67 ++++- server/modules/protocol/mysql_backend.c | 31 ++- server/modules/protocol/mysql_client.c | 163 ++++++++++--- server/modules/protocol/mysql_common.c | 67 ++++- server/modules/routing/GaleraHACRoute.c | 13 +- server/modules/routing/readconnroute.c | 17 +- .../routing/readwritesplit/readwritesplit.c | 230 +++++++++++++----- utils/skygw_debug.h | 6 + 19 files changed, 518 insertions(+), 169 deletions(-) diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index aafd746ce..2da6d4ad5 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -116,9 +116,7 @@ skygw_query_type_t skygw_query_classifier_get_type( query_str = const_cast(query); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "%lu [skygw_query_classifier_get_type] Query : \"%s\"", - pthread_self(), - query_str))); + "Query : \"%s\"", query_str))); /** Get server handle */ mysql = mysql_init(NULL); diff --git a/server/core/dbusers.c b/server/core/dbusers.c index 41dda5a92..dd36d683c 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -213,9 +213,9 @@ getUsers(SERVICE *service, struct users *users) "Exiting."))); return -1; } - /* - * Attempt to connect to each database in the service in turn until - * we find one that we can connect to or until we run out of databases + /** + * Attempt to connect to one of the databases database or until we run + * out of databases * to try */ server = service->databases; @@ -229,17 +229,6 @@ getUsers(SERVICE *service, struct users *users) NULL, 0) == NULL)) { - if (server == NULL) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to connect to %s:%d, \"%s\"", - server->name, - server->port, - mysql_error(con)))); - mysql_close(con); - return -1; - } server = server->nextdb; } free(dpwd); diff --git a/server/core/dcb.c b/server/core/dcb.c index 16ca964ac..351f7e83a 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -559,7 +559,8 @@ int rc; dcb->fd = fd; /** Copy status field to DCB */ dcb->dcb_server_status = server->status; - + ss_debug(dcb->dcb_port = server->port;) + /*< * backend_dcb is connected to backend server, and once backend_dcb * is added to poll set, authentication takes place as part of @@ -937,7 +938,8 @@ int above_water; above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; spinlock_acquire(&dcb->writeqlock); - if (dcb->writeq) + + if (dcb->writeq) { int len; @@ -996,16 +998,17 @@ int above_water; } spinlock_release(&dcb->writeqlock); atomic_add(&dcb->writeqlen, -n); - /* The write queue has drained, potentially need to call a callback function */ + + /* The write queue has drained, potentially need to call a callback function */ if (dcb->writeq == NULL) dcb_call_callback(dcb, DCB_REASON_DRAINED); - if (above_water && dcb->writeqlen < dcb->low_water) + + if (above_water && dcb->writeqlen < dcb->low_water) { atomic_add(&dcb->stats.n_low_water, 1); dcb_call_callback(dcb, DCB_REASON_LOW_WATER); } - return n; } @@ -1030,7 +1033,8 @@ dcb_close(DCB *dcb) * dcb_close may be called for freshly created dcb, in which case * it only needs to be freed. */ - if (dcb->state == DCB_STATE_ALLOC) { + if (dcb->state == DCB_STATE_ALLOC) + { dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); return; @@ -1047,6 +1051,16 @@ dcb_close(DCB *dcb) ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); + +#if defined(ERRHANDLE) + /** + * close protocol and router session + */ + if (dcb->func.close != NULL) + { + dcb->func.close(dcb); + } +#endif dcb_call_callback(dcb, DCB_REASON_CLOSE); @@ -1068,7 +1082,8 @@ dcb_close(DCB *dcb) STRDCBSTATE(dcb->state)))); } - if (dcb->state == DCB_STATE_NOPOLLING) { + if (dcb->state == DCB_STATE_NOPOLLING) + { dcb_add_to_zombieslist(dcb); } } diff --git a/server/core/poll.c b/server/core/poll.c index 0b23a6b05..87d3640f0 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -349,8 +349,8 @@ poll_waitevents(void *arg) ss_dassert(dcb->state != DCB_STATE_FREED); ss_debug(spinlock_release(&dcb->dcb_initlock);) - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, "%lu [poll_waitevents] event %d dcb %p " "role %s", pthread_self(), diff --git a/server/core/service.c b/server/core/service.c index cb7bea457..ea177f097 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -991,3 +991,9 @@ static void service_add_qualified_param( (*p)->next = NULL; spinlock_release(&svc->spin); } + +char* service_get_name( + SERVICE* svc) +{ + return svc->name; +} \ No newline at end of file diff --git a/server/include/dcb.h b/server/include/dcb.h index e90a64856..ec0bc5b46 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -23,6 +23,8 @@ #include #include +// #define ERRHANDLE + struct session; struct server; struct service; @@ -222,6 +224,7 @@ typedef struct dcb { unsigned int high_water; /**< High water mark */ unsigned int low_water; /**< Low water mark */ #if defined(SS_DEBUG) + int dcb_port; /**< port of target server */ skygw_chk_t dcb_chk_tail; #endif } DCB; diff --git a/server/include/router.h b/server/include/router.h index 004f91d51..0c467e671 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -74,7 +74,13 @@ typedef struct router_object { int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue); void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); - void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action); + void (*handleError)( + ROUTER* instance, + void* router_session, + char* message, + DCB *backend_dcb, + int action, + bool* succp); uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; diff --git a/server/include/server.h b/server/include/server.h index 0f093e39b..e36108481 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -77,6 +77,8 @@ typedef struct server { #define SERVER_MASTER 0x0002 /**<< The server is a master, i.e. can handle writes */ #define SERVER_SLAVE 0x0004 /**<< The server is a slave, i.e. can handle reads */ #define SERVER_JOINED 0x0008 /**<< The server is joined in a Galera cluster */ +#define SERVER_MAINT 0x1000 /**<< Server is in maintenance mode */ + /** * Is the server running - the macro returns true if the server is marked as running @@ -107,6 +109,12 @@ typedef struct server { #define SERVER_IS_JOINED(server) \ (((server)->status & (SERVER_RUNNING|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED)) +/** + * Is the server in maintenance mode. + */ +#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) + + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); @@ -121,4 +129,6 @@ extern void server_set_status(SERVER *, int); extern void server_clear_status(SERVER *, int); extern void serverAddMonUser(SERVER *, char *, char *); extern void server_update(SERVER *, char *, char *, char *); +void server_set_unique_name(SERVER *server, char *name); + #endif diff --git a/server/include/service.h b/server/include/service.h index 28ae8d3f3..a2f700ad5 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -162,4 +162,5 @@ bool service_set_slave_conn_limit ( extern void dprintService(DCB *, SERVICE *); extern void dListServices(DCB *); extern void dListListeners(DCB *); +char* service_get_name(SERVICE* svc); #endif diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 41bcf0416..838dd40e6 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -88,7 +88,7 @@ #define SMALL_CHUNK 1024 #define MAX_CHUNK SMALL_CHUNK * 8 * 4 #define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) - +#define COM_QUIT_PACKET_SIZE (4+1) struct dcb; typedef enum { @@ -104,7 +104,6 @@ typedef enum { MYSQL_SESSION_CHANGE } mysql_pstate_t; - /* * MySQL Protocol specific state data */ @@ -257,6 +256,7 @@ int gw_send_authentication_to_backend( MySQLProtocol *protocol); const char *gw_mysql_protocol_state2string(int state); int gw_do_connect_to_backend(char *host, int port, int* fd); +int mysql_send_com_quit(DCB* dcb, int packet_number); int mysql_send_custom_error ( DCB *dcb, int packet_number, diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index a4eecf4d5..07435f951 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,6 +31,12 @@ #include +typedef enum bref_state { + BREF_NOT_USED, + BREF_IN_USE, + BREF_CLOSED +} bref_state_t; + typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, @@ -159,6 +165,7 @@ typedef struct backend_ref_st { #endif BACKEND* bref_backend; DCB* bref_dcb; + bref_state_t bref_state; sescmd_cursor_t bref_sescmd_cur; #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index ddf1f7cbc..9cee3aec2 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -287,25 +287,34 @@ char *sep; static void monitorDatabase(MONITOR_SERVERS *database, char *defaultUser, char *defaultPasswd) { -MYSQL_ROW row; -MYSQL_RES *result; -int num_fields; -int ismaster = 0, isslave = 0; -char *uname = defaultUser, *passwd = defaultPasswd; -unsigned long int server_version = 0; -char *server_string; +MYSQL_ROW row; +MYSQL_RES *result; +int num_fields; +int ismaster = 0, isslave = 0; +char *uname = defaultUser, *passwd = defaultPasswd; +unsigned long int server_version = 0; +char *server_string; +static int conn_err_count; +static int modval = 10; - if (database->server->monuser != NULL) + if (database->server->monuser != NULL) { uname = database->server->monuser; passwd = database->server->monpw; } + if (uname == NULL) return; + if (database->con == NULL || mysql_ping(database->con) != 0) { char *dpwd = decryptPassword(passwd); - database->con = mysql_init(NULL); + int rc; + int read_timeout = 1; + + database->con = mysql_init(NULL); + rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); + if (mysql_real_connect(database->con, database->server->name, uname, @@ -315,8 +324,25 @@ char *server_string; NULL, 0) == NULL) { + if (conn_err_count%modval == 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Monitor was unable to connect to " + "server %s:%d : \"%s\"", + database->server->name, + database->server->port, + mysql_error(database->con)))); + conn_err_count = 0; + modval += 1; + } + else + { + conn_err_count += 1; + } free(dpwd); server_clear_status(database->server, SERVER_RUNNING); + return; } free(dpwd); @@ -428,7 +454,6 @@ char *server_string; server_clear_status(database->server, SERVER_SLAVE); server_clear_status(database->server, SERVER_MASTER); } - } /** @@ -441,6 +466,8 @@ monitorMain(void *arg) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MONITOR_SERVERS *ptr; +static int err_count; +static int modval = 10; if (mysql_thread_init()) { @@ -463,7 +490,27 @@ MONITOR_SERVERS *ptr; ptr = handle->databases; while (ptr) { + unsigned int prev_status = ptr->server->status; + monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd); + + if (ptr->server->status != prev_status || + (SERVER_IS_DOWN(ptr->server) && + err_count%modval == 0)) + { + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "Backend server %s:%d state : %s", + ptr->server->name, + ptr->server->port, + STRSRVSTATUS(ptr->server)))); + err_count = 0; + modval += 1; + } + else if (SERVER_IS_DOWN(ptr->server)) + { + err_count += 1; + } ptr = ptr->next; } thread_millisleep(10000); diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index efa92a71d..4072a9201 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -71,7 +71,7 @@ static GWPROTOCOL MyObject = { gw_backend_close, /* Close */ NULL, /* Listen */ gw_change_user, /* Authentication */ - gw_session /* Session */ + NULL /* Session */ }; /* @@ -403,7 +403,11 @@ static int gw_read_backend_event(DCB *dcb) { * failed, connection must be closed to avoid backend * dcb from getting hanged. */ +#if defined(ERRHANDLE) + dcb_close(dcb); +#else (dcb->func).close(dcb); +#endif rc = 0; goto return_rc; } @@ -435,7 +439,8 @@ static int gw_read_backend_event(DCB *dcb) { if (client_protocol->state == MYSQL_IDLE) { - router->clientReply(router_instance, + gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL); + router->clientReply(router_instance, rsession, writebuf, dcb); @@ -443,6 +448,7 @@ static int gw_read_backend_event(DCB *dcb) { } goto return_rc; } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { + gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL); router->clientReply(router_instance, rsession, writebuf, dcb); rc = 1; } @@ -542,6 +548,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) MySQLProtocol *backend_protocol = dcb->protocol; int rc = 0; + ss_dassert(dcb->state == DCB_STATE_POLLING); +#if !defined(ERRHANDLE) /*< * Don't write to backend if backend_dcb is not in poll set anymore. */ @@ -565,6 +573,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) goto return_rc; } spinlock_release(&dcb->dcb_initlock); +#endif spinlock_acquire(&dcb->authlock); /** * Pick action according to state of protocol. @@ -608,6 +617,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRPROTOCOLSTATE(backend_protocol->state)))); spinlock_release(&dcb->authlock); + rc = dcb_write(dcb, queue); goto return_rc; break; @@ -816,16 +826,18 @@ gw_backend_hangup(DCB *dcb) } /** - * Close the backend dcb - * + * Send COM_QUIT to backend so that it can be closed. * @param dcb The current Backend DCB * @return 1 always */ static int gw_backend_close(DCB *dcb) { - /*< vraa : errorHandle */ +#if defined(ERRHANDLE) + mysql_send_com_quit(dcb, 1); +#else dcb_close(dcb); +#endif return 1; } @@ -903,7 +915,12 @@ static int backend_write_delayqueue(DCB *dcb) -static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) { +static int gw_change_user( + DCB *backend, + SERVER *server, + SESSION *in_session, + GWBUF *queue) +{ MYSQL_session *current_session = NULL; MySQLProtocol *backend_protocol = NULL; MySQLProtocol *client_protocol = NULL; @@ -989,6 +1006,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB * @param * @return always 1 */ +/* static int gw_session(DCB *backend_dcb, void *data) { GWBUF *queue = NULL; @@ -998,3 +1016,4 @@ static int gw_session(DCB *backend_dcb, void *data) { return 1; } +*/ \ No newline at end of file diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index bcd94e423..94d0ed419 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -598,7 +598,8 @@ int gw_read_client_event(DCB* dcb) { else { /** - * There is at least one complete mysql packet read + * There is at least one complete mysql packet in + * read_buffer. */ read_buffer = dcb->dcb_readqueue; dcb->dcb_readqueue = NULL; @@ -623,58 +624,70 @@ int gw_read_client_event(DCB* dcb) { switch (protocol->state) { case MYSQL_AUTH_SENT: - /* - * Read all the data that is available into a chain of buffers - */ { int auth_val = -1; auth_val = gw_mysql_do_authentication(dcb, read_buffer); - // Data handled withot the dcb->func.write - // so consume it now - // be sure to consume it all read_buffer = gwbuf_consume(read_buffer, nbytes_read); + ss_dassert(read_buffer == NULL || GWBUF_EMPTY(read_buffer)); if (auth_val == 0) { SESSION *session = NULL; protocol->state = MYSQL_AUTH_RECV; - //write to client mysql AUTH_OK packet, packet n. is 2 - // start a new session, and connect to backends + /** + * Create session, and a router session for it. + * If successful, there will be backend connection(s) + * after this point. + */ session = session_alloc(dcb->service, dcb); - if (session != NULL) { + if (session != NULL) + { CHK_SESSION(session); ss_dassert(session->state != SESSION_STATE_ALLOC); protocol->state = MYSQL_IDLE; + /** + * Send an AUTH_OK packet to the client, + * packet sequence is # 2 + */ mysql_send_ok(dcb, 2, 0, NULL); - } else { + } + else + { protocol->state = MYSQL_AUTH_FAILED; + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, 2, 0, "failed to create new session"); +#if defined(ERRHANDLE) + dcb_close(dcb); +#else dcb->func.close(dcb); +#endif } } else { protocol->state = MYSQL_AUTH_FAILED; + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, 2, 0, "Authorization failed"); +#if defined(ERRHANDLE) + dcb_close(dcb); +#else dcb->func.close(dcb); +#endif } } break; case MYSQL_IDLE: - /* - * Read all the data that is available into a chain of buffers - */ { uint8_t cap = 0; uint8_t *ptr_buff = NULL; @@ -682,14 +695,16 @@ int gw_read_client_event(DCB* dcb) { bool stmt_input; /*< router input type */ session = dcb->session; + ss_dassert( session!= NULL); - // get the backend session, if available - if (session != NULL) { + if (session != NULL) + { CHK_SESSION(session); router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; + ss_dassert(rsession != NULL); } /* Now, we are assuming in the first buffer there is @@ -706,9 +721,11 @@ int gw_read_client_event(DCB* dcb) { * COM_QUIT : close client dcb * else : write custom error to client dcb. */ - if(rsession == NULL) { + if(rsession == NULL) + { /** COM_QUIT */ - if (mysql_command == '\x01') { + if (mysql_command == '\x01') + { LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Client read " @@ -716,8 +733,18 @@ int gw_read_client_event(DCB* dcb) { "client dcb %p.", pthread_self(), dcb))); +#if defined(ERRHANDLE) + /** + * close router session and that closes + * backends + */ + dcb_close(dcb); +#else (dcb->func).close(dcb); - } else { +#endif + } + else + { /* Send a custom error as MySQL command reply */ mysql_send_custom_error( dcb, @@ -732,9 +759,10 @@ int gw_read_client_event(DCB* dcb) { read_buffer = gwbuf_consume(read_buffer, nbytes_read); goto return_rc; } + /** Ask what type of input the router expects */ cap = router->getCapabilities(router_instance, rsession); - + if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT)) { stmt_input = false; @@ -752,7 +780,6 @@ int gw_read_client_event(DCB* dcb) { "%lu [gw_read_client_event] Reading router " "capabilities failed.", pthread_self()))); - mysql_send_custom_error(dcb, 1, 0, @@ -761,10 +788,17 @@ int gw_read_client_event(DCB* dcb) { rc = 1; goto return_rc; } - /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { +#if defined(ERRHANDLE) + /** + * Close router session and that closes + * backends. + * Closing backends includes sending COM_QUIT packets. + */ + dcb_close(dcb); +#else router->routeQuery(router_instance, rsession, read_buffer); LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, @@ -774,6 +808,7 @@ int gw_read_client_event(DCB* dcb) { dcb))); /** close client connection, closes router session too */ rc = dcb->func.close(dcb); +#endif } else { @@ -805,6 +840,49 @@ int gw_read_client_event(DCB* dcb) { if (rc == 1) { rc = 0; /**< here '0' means success */ } else { +#if defined(ERRHANDLE2) + bool succp; + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing the query failed. " + "Reselecting backends."))); + + /** + * Decide whether close router and its + * connections or just send an error to client + */ + router->handleError(router_instance, + rsession, + "Query routing failed. " + "Query execution aborted. " + "Reselecting backend.", + NULL, + ERRACT_RELECT_BACKENDS, + &succp); + + if (!succp) + { + router->handleError(router_instance, + rsession, + "Connection to " + "backend lost.", + NULL, + ERRACT_CLOSE_RSES, + NULL); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Reselecting backend " + "servers failed."))); + } + else + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Reselected backend servers."))); + } +#else mysql_send_custom_error(dcb, 1, 0, @@ -812,6 +890,7 @@ int gw_read_client_event(DCB* dcb) { "Connection to backend " "lost."); protocol->state = MYSQL_IDLE; +#endif } } goto return_rc; @@ -1171,23 +1250,31 @@ int gw_MySQLAccept(DCB *listener) client_dcb->fd = c_sock; // get client address - if ( client_conn.sa_family == AF_UNIX) { + if ( client_conn.sa_family == AF_UNIX) + { // client address client_dcb->remote = strdup("localhost_from_socket"); // set localhost IP for user authentication (client_dcb->ipv4).sin_addr.s_addr = 0x0100007F; - } else { + } + else + { /* client IPv4 in raw data*/ - memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in)); + memcpy(&client_dcb->ipv4, + (struct sockaddr_in *)&client_conn, + sizeof(struct sockaddr_in)); /* client IPv4 in string representation */ client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); - if (client_dcb->remote != NULL) { - inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN); + + if (client_dcb->remote != NULL) + { + inet_ntop(AF_INET, + &(client_dcb->ipv4).sin_addr, + client_dcb->remote, + INET_ADDRSTRLEN); } } - protocol = mysql_protocol_init(client_dcb, c_sock); - ss_dassert(protocol != NULL); if (protocol == NULL) { @@ -1224,7 +1311,7 @@ int gw_MySQLAccept(DCB *listener) 0, "MaxScale internal error."); - /** delete client_dcb */ + /** close client_dcb */ dcb_close(client_dcb); /** Previous state is recovered in poll_add_dcb. */ @@ -1265,10 +1352,13 @@ static int gw_error_client_event( int rc; CHK_DCB(dcb); - +#if defined(ERRHANDLE) + dcb_close(dcb); + return 1; +#else rc = dcb->func.close(dcb); - return rc; +#endif } static int @@ -1305,8 +1395,10 @@ gw_client_close(DCB *dcb) router->closeSession(router_instance, rsession); } +#if !defined(ERRHANDLE) + /** close client DCB */ dcb_close(dcb); - +#endif return 1; } @@ -1324,9 +1416,14 @@ gw_client_hangup_event(DCB *dcb) int rc; CHK_DCB(dcb); +#if defined(ERRHANDLE) + dcb_close(dcb); + return 1; +#else rc = dcb->func.close(dcb); return rc; +#endif } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 1d0932d7b..a620c2d8b 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -321,26 +321,27 @@ int gw_receive_backend_auth( } else if (ptr[4] == 0xff) { - size_t packetlen = MYSQL_GET_PACKET_LEN(ptr)+4; - char* bufstr = (char *)calloc(1, packetlen-3); - - snprintf(bufstr, packetlen-6, "%s", &ptr[7]); - + size_t len = MYSQL_GET_PACKET_LEN(ptr); + char* err = strndup(&ptr[8], 5); + char* bufstr = strndup(&ptr[13], len-4-5); + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_receive_backend_auth] Invalid " "authentication message from backend dcb %p " - "fd %d, ptr[4] = %p, msg %s.", + "fd %d, ptr[4] = %p, error %s, msg %s.", pthread_self(), dcb, dcb->fd, ptr[4], + err, bufstr))); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Invalid authentication message " - "from backend. Msg : %s", + "from backend. Error : %s, Msg : %s", + err, bufstr))); free(bufstr); @@ -367,7 +368,7 @@ int gw_receive_backend_auth( /*< * Remove data from buffer. */ - head = gwbuf_consume(head, GWBUF_LENGTH(head)); + while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL); } else if (n == 0) { @@ -634,8 +635,8 @@ int gw_do_connect_to_backend( LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error: Establishing connection to backend server " - "%s:%d failed.\n\t\t Socket creation failed due " - "%d, %s.", + "%s:%d failed.\n\t\t Socket creation failed " + "due %d, %s.", host, port, eno, @@ -736,6 +737,45 @@ gw_mysql_protocol_state2string (int state) { } } +int mysql_send_com_quit( + DCB* dcb, + int packet_number) +{ + uint8_t *data; + GWBUF *buf; + int nbytes = 0; + + CHK_DCB(dcb); + ss_dassert(packet_number <= 255); + + if (dcb == NULL || + (dcb->state != DCB_STATE_NOPOLLING && + dcb->state != DCB_STATE_ZOMBIE)) + { + return 0; + } + + buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); + ss_dassert(buf != NULL); + + if (buf == NULL) + { + return 0; + } + data = GWBUF_DATA(buf); + + *data++ = 0x1; + *data++ = 0x0; + *data++ = 0x0; + *data++ = packet_number; + *data = 0x1; + + nbytes = dcb->func.write(dcb, buf); + + return nbytes; +} + + /** * mysql_send_custom_error * @@ -1229,7 +1269,12 @@ int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password, * */ int -mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { +mysql_send_auth_error ( + DCB *dcb, + int packet_number, + int in_affected_rows, + const char *mysql_message) +{ uint8_t *outbuf = NULL; uint8_t mysql_payload_size = 0; uint8_t mysql_packet_header[4]; diff --git a/server/modules/routing/GaleraHACRoute.c b/server/modules/routing/GaleraHACRoute.c index cf3e833ed..4f143143d 100644 --- a/server/modules/routing/GaleraHACRoute.c +++ b/server/modules/routing/GaleraHACRoute.c @@ -58,12 +58,14 @@ static void GHACloseSession(ROUTER *instance, void *router_session); static void GHAFreeSession(ROUTER *instance, void *router_session); static int GHARouteQuery(ROUTER *instance, void *router_session, GWBUF *queue); static void GHADiagnostics(ROUTER *instance, DCB *dcb); + static void GHAClientReply( ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb); -static void GHAErrorReply( + +static void GHAHandleError( ROUTER *instance, void *router_session, char *message, @@ -79,7 +81,7 @@ static ROUTER_OBJECT MyObject = { GHARouteQuery, GHADiagnostics, GHAClientReply, - GHAErrorReply + GHAHandleError }; static bool rses_begin_router_action( @@ -630,10 +632,9 @@ GHAClientReply( } /** - * Error Reply routine + * Error handling routine * - * The routine will reply to client errors and/or closing the session - * or try to open a new backend connection. + * The routine will handle error occurred in backend. * * @param instance The router instance * @param router_session The router session @@ -643,7 +644,7 @@ GHAClientReply( * */ static void -GHAErrorReply( +GHAHandleError( ROUTER *instance, void *router_session, char *message, diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 2bfba7efc..54e910893 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -102,12 +102,13 @@ static void clientReply( void *router_session, GWBUF *queue, DCB *backend_dcb); -static void errorReply( +static void handleError( ROUTER *instance, void *router_session, char *message, DCB *backend_dcb, - int action); + int action, + bool *succp); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -120,7 +121,7 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostics, clientReply, - errorReply, + handleError, getCapabilities }; @@ -681,10 +682,9 @@ clientReply( } /** - * Error Reply routine + * Error Handler routine * - * The routine will reply to client errors and/or closing the session - * or try to open a new backend connection. + * The routine will handle errors that occurred in backend writes. * * @param instance The router instance * @param router_session The router session @@ -694,12 +694,13 @@ clientReply( * */ static void -errorReply( +handleError( ROUTER *instance, void *router_session, char *message, DCB *backend_dcb, - int action) + int action, + bool *succp) { DCB *client = NULL; SESSION *session = backend_dcb->session; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 5b855afe4..d481c5cf9 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -30,6 +30,9 @@ #include #include #include +#if defined(SS_DEBUG) +# include +#endif extern int lm_enabled_logfiles_bitmask; @@ -63,11 +66,23 @@ static void closeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); + static void clientReply( ROUTER* instance, void* router_session, GWBUF* queue, DCB* backend_dcb); + +static void handleError( + ROUTER* instance, + void* router_session, + char* message, + DCB* backend_dcb, + int action, + bool* succp); + +static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); + static uint8_t getCapabilities (ROUTER* inst, void* router_session); int bref_cmp_global_conn( @@ -118,7 +133,7 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostic, clientReply, - NULL, + handleError, getCapabilities }; static bool rses_begin_locked_router_action( @@ -585,7 +600,7 @@ static void* newSession( client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; - + /** * Version is bigger than zero once initialized. */ @@ -658,11 +673,19 @@ static void closeSession( DCB* dcb = backend_ref[i].bref_dcb; /** Close those which had been connected */ - if (dcb != NULL) + if (backend_ref[i].bref_state == BREF_IN_USE) { CHK_DCB(dcb); - backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */ + backend_ref[i].bref_state = BREF_NOT_USED; + +#if defined(ERRHANDLE) + /** + * closes protocol and dcb + */ + dcb_close(dcb); +#else dcb->func.close(dcb); +#endif /** decrease server current connection counters */ atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); @@ -688,7 +711,7 @@ static void freeSession( for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_dcb == NULL) + if (backend_ref[i].bref_state != BREF_IN_USE) { continue; } @@ -763,8 +786,8 @@ static bool get_dcb( for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; - - if (backend_ref[i].bref_dcb != NULL && + + if (backend_ref[i].bref_state == BREF_IN_USE && SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || b->backend_conn_count < smallest_nconn)) @@ -778,8 +801,8 @@ static bool get_dcb( if (!succp) { backend_ref = rses->rses_master_ref; - - if (backend_ref->bref_dcb != NULL) + + if (backend_ref[i].bref_state == BREF_IN_USE) { *p_dcb = backend_ref->bref_dcb; succp = true; @@ -799,13 +822,13 @@ static bool get_dcb( } ss_dassert(succp); } - else if (btype == BE_MASTER || BE_JOINED) + else if (btype == BE_MASTER) { for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_dcb != NULL && + if (backend_ref[i].bref_state == BREF_IN_USE && (SERVER_IS_MASTER(b->backend_server) || SERVER_IS_JOINED(b->backend_server))) { @@ -930,23 +953,7 @@ static int routeQuery( default: break; } /**< switch by packet type */ -#if 0 - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "String\t\"%s\"", - querystr == NULL ? "(empty)" : querystr))); - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Packet type\t%s", - STRPACKETTYPE(packet_type)))); -#endif -#if defined(AUTOCOMMIT_OPT) - if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && - !router_cli_ses->rses_autocommit_enabled) || - (QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) && - router_cli_ses->rses_autocommit_enabled)) - { - /** reply directly to client */ - } -#endif + /** * If autocommit is disabled or transaction is explicitly started * transaction becomes active and master gets all statements until @@ -1128,6 +1135,7 @@ static bool rses_begin_locked_router_action( CHK_CLIENT_RSES(rses); if (rses->rses_closed) { + goto return_succp; } spinlock_acquire(&rses->rses_lock); @@ -1138,10 +1146,6 @@ static bool rses_begin_locked_router_action( succp = true; return_succp: - if (!succp) - { - /** log that router session was closed */ - } return succp; } @@ -1242,9 +1246,7 @@ static void clientReply( */ if (!rses_begin_locked_router_action(router_cli_ses)) { - while ((writebuf = gwbuf_consume( - writebuf, - GWBUF_LENGTH(writebuf))) != NULL); + print_error_packet(router_cli_ses, writebuf, backend_dcb); goto lock_failed; } /** Holding lock ensures that router session remains open */ @@ -1311,14 +1313,9 @@ static void clientReply( { /** Write reply to client DCB */ client_dcb->func.write(client_dcb, writebuf); - - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [clientReply:rwsplit] client dcb %p, " - "backend dcb %p. End of normal reply.", - pthread_self(), - client_dcb, - backend_dcb))); + /** + * Log reply but use identifier for query + */ } lock_failed: @@ -1385,6 +1382,8 @@ int bref_cmp_behind_master( * * @details It is assumed that there is only one master among servers of * a router instance. As a result, the first master found is chosen. + * There will possibly be more backend references than connected backends + * because only those in correct state are connected to. */ static bool select_connect_backend_servers( backend_ref_t** p_master_ref, @@ -1423,13 +1422,13 @@ static bool select_connect_backend_servers( if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ { is_synced_master = true; - } + } else { is_synced_master = false; - } - -#if 0 + } + +#if defined(EXTRA_DEBUGGING) LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); for (i=0; ibitvalue is %d", - pthread_self(), b->backend_server->name, b->backend_server->port, + STRSRVSTATUS(b->backend_server), b->backend_conn_count, - b->backend_server->status, - router->bitmask))); + router->bitmask))); if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == @@ -1524,6 +1522,7 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { slaves_connected += 1; + backend_ref[i].bref_state = BREF_IN_USE; /** * Increase backend connection counter. * Server's stats are _increased_ in @@ -1558,6 +1557,7 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { master_connected = true; + backend_ref[i].bref_state = BREF_IN_USE; *p_master_ref = &backend_ref[i]; /** Increase backend connection counter */ /** Increase backend connection counter */ @@ -1577,6 +1577,18 @@ static bool select_connect_backend_servers( /* handle connect error */ } } + else + { + succp = false; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with server %s:%d, %s", + b->backend_server->name, + b->backend_server->port, + STRSRVSTATUS(b->backend_server)))); + /* handle connect error */ + } } } /*< for */ @@ -1637,8 +1649,8 @@ static bool select_connect_backend_servers( for (i=0; ibackend_conn_count > 0); /** disconnect opened connections */ @@ -2074,8 +2086,8 @@ static bool execute_sescmd_in_backend( bool succp = true; int rc = 0; sescmd_cursor_t* scur; - - if (backend_ref->bref_dcb == NULL) + + if (backend_ref->bref_state == BREF_CLOSED) { goto return_succp; } @@ -2349,8 +2361,8 @@ static bool route_session_write( for (i=0; irses_nbackends; i++) { DCB* dcb = backend_ref[i].bref_dcb; - - if (dcb != NULL) + + if (backend_ref[i].bref_state == BREF_IN_USE) { rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); @@ -2384,13 +2396,19 @@ static bool route_session_write( for (i=0; irses_nbackends; i++) { - succp = execute_sescmd_in_backend(&backend_ref[i]); - - if (!succp) + if (backend_ref[i].bref_state == BREF_IN_USE) { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; + succp = execute_sescmd_in_backend(&backend_ref[i]); + + if (!succp) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to execute session " + "command in %s:%d", + backend_ref[i].bref_backend->backend_server->name, + backend_ref[i].bref_backend->backend_server->port))); + } } } /** Unlock router session */ @@ -2450,4 +2468,84 @@ static void rwsplit_process_options( } } } /*< for */ -} \ No newline at end of file +} + +/** + * Error Handler routine + * + * The routine will handle errors that occurred in backend writes. + * + * @param instance The router instance + * @param router_session The router session + * @param message The error message to reply + * @param backend_dcb The backend DCB + * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * + */ +static void handleError ( + ROUTER *instance, + void *router_session, + char *message, + DCB *backend_dcb, + int action, + bool *succp) +{ + DCB* client_dcb = NULL; + SESSION* session = backend_dcb->session; + + client_dcb = session->client; + + ss_dassert(client_dcb != NULL); +} + +static void print_error_packet( + ROUTER_CLIENT_SES* rses, + GWBUF* buf, + DCB* dcb) +{ + if (buf->gwbuf_type == GWBUF_TYPE_MYSQL) + { + while (gwbuf_length(buf) > 0) + { + /** + * This works with MySQL protocol only ! + * Protocol specific packet print functions would be nice. + */ + uint8_t* ptr = GWBUF_DATA(buf); + size_t len = MYSQL_GET_PACKET_LEN(ptr); + + if (MYSQL_GET_COMMAND(ptr) == 0xff) + { + SERVER* srv = NULL; + backend_ref_t* bref = rses->rses_backend_ref; + int i; + char* bufstr; + + for (i=0; irses_nbackends; i++) + { + if (bref[i].bref_dcb == dcb) + { + srv = bref[i].bref_backend->backend_server; + } + } + ss_dassert(srv != NULL); + + bufstr = strndup(&ptr[7], len-3); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Backend server %s:%d responded with " + "error : %s", + srv->name, + srv->port, + bufstr))); + free(bufstr); + } + buf = gwbuf_consume(buf, len+4); + } + } + else + { + while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL); + } +} diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 7277cb2e5..6e129152d 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -228,6 +228,12 @@ typedef enum skygw_chk_t { ((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \ ((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \ ((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria")))) + +#define STRSRVSTATUS(s) ((SERVER_IS_RUNNING(s) && SERVER_IS_MASTER(s)) ? "RUNNING MASTER" : \ + ((SERVER_IS_RUNNING(s) && SERVER_IS_SLAVE(s)) ? "RUNNING SLAVE" : \ + ((SERVER_IS_RUNNING(s) && SERVER_IS_JOINED(s)) ? "RUNNING JOINED" : \ + ((SERVER_IS_RUNNING(s) && SERVER_IN_MAINT(s)) ? "RUNNING MAINTENANCE" : \ + (SERVER_IS_RUNNING(s) ? "RUNNING (only)" : "NO STATUS"))))) #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \