From 5bcae6453835a9b564af772d338ec7211cd8b337 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sun, 15 Jun 2014 23:44:07 +0300 Subject: [PATCH] When protocol closes DCB it calls dcb_close instead of dcb->func.close. dcb_close then calls dcb->func.close. This is now changed to all protocols and routers. Rwsplit handles ERRACT_NEW_CONNECTION by clearing backend reference, removing callbacks and associating backend reference with new backend server. If it succeeds and the router session can continue, handleError returns true. Otherwise false. When ever false is returned it means that session must be closed. Rwsplit now tolerates backend failures in a way that it searches new backends when monitor, backend, or client operation fails due to backend failure. --- server/core/dcb.c | 48 +-- server/include/dcb.h | 1 + server/include/router.h | 22 +- server/include/server.h | 5 + server/include/session.h | 2 +- .../include/mysql_client_server_protocol.h | 6 + server/modules/monitor/mysql_mon.c | 1 + server/modules/protocol/httpd.c | 3 +- server/modules/protocol/mysql_backend.c | 245 +++++---------- server/modules/protocol/mysql_client.c | 122 ++++---- server/modules/protocol/mysql_common.c | 152 ++++----- server/modules/protocol/telnetd.c | 3 +- server/modules/routing/GaleraHACRoute.c | 2 +- server/modules/routing/debugcli.c | 2 +- server/modules/routing/readconnroute.c | 2 +- .../routing/readwritesplit/readwritesplit.c | 289 +++++++++++++----- 16 files changed, 472 insertions(+), 433 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index fe5b29590..fd96361c4 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -110,6 +110,7 @@ DCB *rval; #if defined(SS_DEBUG) rval->dcb_chk_top = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB; + rval->dcb_errhandle_called = false; #endif rval->dcb_role = role; #if 1 @@ -149,7 +150,7 @@ DCB *rval; /** - * Free a DCB that has not been associated with a decriptor. + * Free a DCB that has not been associated with a descriptor. * * @param dcb The DCB to free */ @@ -957,7 +958,6 @@ int above_water; if (dcb->writeq) { int len; - /* * Loop over the buffer chain in the pending writeq * Send as much of the data in that chain as possible and @@ -1042,9 +1042,7 @@ void dcb_close(DCB *dcb) { int rc; -#if defined(ERRHANDLE) - bool isclient; -#endif + CHK_DCB(dcb); /*< @@ -1062,21 +1060,13 @@ dcb_close(DCB *dcb) dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); - - -#if defined(ERRHANDLE) - isclient = dcb_isclient(dcb); + /*< + * Stop dcb's listening and modify state accordingly. + */ + rc = poll_remove_dcb(dcb); - if (isclient) - { - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); - } /** * close protocol and router session */ @@ -1085,26 +1075,6 @@ dcb_close(DCB *dcb) dcb->func.close(dcb); } - if (!isclient) - { - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); - } -#else - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); -#endif - dcb_call_callback(dcb, DCB_REASON_CLOSE); if (rc == 0) { @@ -1654,7 +1624,7 @@ int rval = 0; if (cb->reason == reason && cb->cb == callback && cb->userdata == userdata) { - if (pcb == NULL) + if (pcb != NULL) pcb->next = cb->next; else dcb->callbacks = cb->next; diff --git a/server/include/dcb.h b/server/include/dcb.h index dfc97ccce..405fb16f1 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -195,6 +195,7 @@ typedef struct dcb_callback { typedef struct dcb { #if defined(SS_DEBUG) skygw_chk_t dcb_chk_top; + bool dcb_errhandle_called; #endif dcb_role_t dcb_role; SPINLOCK dcb_initlock; diff --git a/server/include/router.h b/server/include/router.h index 14302d230..8f0851091 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -66,6 +66,12 @@ typedef void *ROUTER; * * @see load_module */ +typedef enum error_action { + ERRACT_NEW_CONNECTION = 0x001, + ERRACT_REPLY_CLIENT = 0x002 +} error_action_t; + + typedef struct router_object { ROUTER *(*createInstance)(SERVICE *service, char **options); void *(*newSession)(ROUTER *instance, SESSION *session); @@ -75,12 +81,12 @@ typedef struct router_object { void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); void (*handleError)( - ROUTER* instance, - void* router_session, - char* message, - DCB *backend_dcb, - int action, - bool* succp); + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; @@ -97,10 +103,6 @@ typedef enum router_capability_t { RCAP_TYPE_PACKET_INPUT = (1 << 1) } router_capability_t; -typedef enum error_action { - ERRACT_NEW_CONNECTION = 0x001, - ERRACT_REPLY_CLIENT = 0x002 -} error_action_t; #endif diff --git a/server/include/server.h b/server/include/server.h index b15453c18..d32413bb7 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -117,6 +117,11 @@ typedef struct server { */ #define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) +/** server is not master, slave or joined */ +#define SERVER_NOT_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == 0) + +#define SERVER_IS_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) != 0) + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); diff --git a/server/include/session.h b/server/include/session.h index f99982802..a2415c785 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -57,7 +57,7 @@ typedef enum { SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */ - SESSION_STATE_STOPPING, /*< router is being closed */ + SESSION_STATE_STOPPING, /*< session and router are being closed */ SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_FREE /*< for all sessions */ diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 5f5d30735..bb7338a3e 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -265,6 +265,12 @@ int mysql_send_custom_error ( int packet_number, int in_affected_rows, const char* mysql_message); + +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg); + int gw_send_change_user_to_backend( char *dbname, char *user, diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index f522e7006..2fdc6a97e 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -144,6 +144,7 @@ MYSQL_MONITOR *handle; handle->defaultPasswd = NULL; handle->id = MONITOR_DEFAULT_ID; handle->interval = MONITOR_INTERVAL; + handle->replicationHeartbeat = 0; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index 7d06264b9..7db1366ad 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -245,7 +245,7 @@ HTTPD_session *client_data = NULL; } /* force the client connecton close */ - dcb->func.close(dcb); + dcb_close(dcb); return 0; } @@ -359,7 +359,6 @@ int n_connect = 0; static int httpd_close(DCB *dcb) { - dcb_close(dcb); return 0; } diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 588443c63..ad5a7ac20 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -371,23 +371,11 @@ static int gw_read_backend_event(DCB *dcb) { spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); - - /** - * rsession shouldn't be NULL since session - * state indicates that it was initialized - * successfully. + /** + * Start terminating the session + * by closing the client. */ - rsession = session->router_session; - ss_dassert(rsession != NULL); - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_backend_event] " - "Call closeSession for backend's " - "router client session.", - pthread_self()))); - /* close router_session */ - router->closeSession(router_instance, rsession); + dcb_close(session->client); rc = 1; goto return_rc; } @@ -432,35 +420,41 @@ static int gw_read_backend_event(DCB *dcb) { /* read available backend data */ rc = dcb_read(dcb, &writebuf); - if (rc < 0) { + if (rc < 0) + { /*< vraa : errorHandle */ /*< * Backend generated EPOLLIN event and if backend has * failed, connection must be closed to avoid backend * dcb from getting hanged. */ -#if defined(ERRHANDLE) - bool succp; + GWBUF* errbuf; + bool succp; /** * - send error for client * - mark failed backend BREF_NOT_USED * - go through all servers and select one according to * the criteria that user specified in the beginning. */ + errbuf = mysql_create_custom_error( + 1, + 0, + "Read from backend failed"); + router->handleError(router_instance, rsession, - "Read from backend failed", + errbuf, dcb, ERRACT_NEW_CONNECTION, &succp); - + if (!succp) { - dcb_close(dcb); + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); } -#else - (dcb->func).close(dcb); -#endif + dcb_close(dcb); rc = 0; goto return_rc; } @@ -469,15 +463,6 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - - /* Note the gwbuf doesn't have here a valid queue->command - * descriptions as it is a fresh new one! - * We only have the copied value in dcb->command from - * previuos func.write() and this will be used by the - * router->clientReply - * and pass now the gwbuf to the router - */ - /*< * If dcb->session->client is freed already it may be NULL. */ @@ -598,33 +583,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) MySQLProtocol *backend_protocol = dcb->protocol; int rc = 0; - ss_dassert(dcb->state == DCB_STATE_POLLING); -#if !defined(ERRHANDLE) - /*< - * Don't write to backend if backend_dcb is not in poll set anymore. - */ - spinlock_acquire(&dcb->dcb_initlock); - - if (dcb->state != DCB_STATE_POLLING) - { - /*< vraa : errorHandle */ - /*< Free buffer memory */ - gwbuf_consume(queue, GWBUF_LENGTH(queue)); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [gw_MySQLWrite_backend] Write to backend failed. " - "Backend dcb %p fd %d is %s.", - pthread_self(), - dcb, - dcb->fd, - STRDCBSTATE(dcb->state)))); - spinlock_release(&dcb->dcb_initlock); - rc = 0; - goto return_rc; - } - spinlock_release(&dcb->dcb_initlock); -#endif spinlock_acquire(&dcb->authlock); /** * Pick action according to state of protocol. @@ -697,88 +655,50 @@ return_rc: } /** - * Backend Error Handling for EPOLLER - * + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. */ static int gw_error_backend_event(DCB *dcb) { - SESSION *session; - void *rsession; - ROUTER_OBJECT *router; - ROUTER *router_instance; - int rc = 0; - + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + int rc = 0; + GWBUF* errbuf; + bool succp; + CHK_DCB(dcb); session = dcb->session; CHK_SESSION(session); + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; - router = session->service->router; - router_instance = session->service->router_instance; + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); -#if defined(ERRHANDLE2) router->handleError(router_instance, - rsession, - "Connection to backend server failed", - dcb, - ERRACT_NEW_CONNECTION, - &succp); - - if (!succp) - { - dcb_close(dcb); - } -#else - if (dcb->state != DCB_STATE_POLLING) { - /*< vraa : errorHandle */ - /*< - * if client is not available it needs to be handled in send - * function. Session != NULL, that is known. - */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Writing to backend failed."); - - rc = 0; - } else { - /*< vraa : errorHandle */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Closed backend connection."); - rc = 1; - } - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] Some error occurred in backend. " - "rc = %d", - pthread_self(), - rc))); - - if (session->state == SESSION_STATE_ROUTER_READY) - { + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + /** There are not required backends available, close session. */ + if (!succp) { spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); - - rsession = session->router_session; - /*< - * rsession should never be NULL here. - */ - ss_dassert(rsession != NULL); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()))); - - router->closeSession(router_instance, rsession); } -#endif - return rc; + dcb_close(dcb); + + return 1; } /* @@ -879,7 +799,11 @@ return_fd: /** - * Hangup routine the backend dcb: it does nothing + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. * * @param dcb The current Backend DCB * @return 1 always @@ -893,6 +817,7 @@ gw_backend_hangup(DCB *dcb) ROUTER* router_instance; int rc = 0; bool succp; + GWBUF* errbuf; CHK_DCB(dcb); session = dcb->session; @@ -901,42 +826,26 @@ gw_backend_hangup(DCB *dcb) router = session->service->router; router_instance = session->service->router_instance; - mysql_send_custom_error( - dcb->session->client, - 1, - 0, + errbuf = mysql_create_custom_error( + 1, + 0, "Lost connection to backend server."); - - /** - * errorHandle : - * - sulje katkennut yhteys - miten? - * - etsi riittävä määrä servereitä - * - jos epäonnistui, sammuta sessio - * - jos onnistui, jatka - * - * Jos sammutetaan : - * - dcb_close - backend->func.close() - */ - - /*< vraa : errorHandle */ - /* - * - - lähetä virheviesti clientille jos odottaa - errorHandle : - - etsi riittävä määrä servereitä - - jos epäonnistui, sammuta sessio - - jos onnistui, jatka - */ - router->handleError(router_instance, - rsession, - "Lost connection to backend server", + + router->handleError(router_instance, + rsession, + errbuf, dcb, ERRACT_NEW_CONNECTION, &succp); - if (succp) { - dcb_close(dcb); + /** There are not required backends available, close session. */ + if (!succp) { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); } + dcb_close(dcb); + return 1; } @@ -948,7 +857,6 @@ gw_backend_hangup(DCB *dcb) static int gw_backend_close(DCB *dcb) { -#if defined(ERRHANDLE) DCB* client_dcb; SESSION* session; GWBUF* quitbuf; @@ -957,15 +865,13 @@ gw_backend_close(DCB *dcb) CHK_DCB(dcb); session = dcb->session; CHK_SESSION(session); - + quitbuf = mysql_create_com_quit(NULL, 0); - + /** Send COM_QUIT to the backend being closed */ mysql_send_com_quit(dcb, 0, quitbuf); - if (session != NULL && - (session->state == SESSION_STATE_ROUTER_READY || - session->state == SESSION_STATE_READY)) + if (session != NULL && session->state == SESSION_STATE_STOPPING) { client_dcb = session->client; @@ -976,9 +882,6 @@ gw_backend_close(DCB *dcb) dcb_close(client_dcb); } } -#else - dcb_close(dcb); -#endif return 1; } @@ -1048,6 +951,8 @@ static int backend_write_delayqueue(DCB *dcb) "Failed to write buffered data to back-end server. " "Buffer was empty or back-end was disconnected during " "operation."); + + dcb->session->state = SESSION_STATE_STOPPING; dcb_close(dcb); } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 9c6955f7d..5f6c7be0f 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -522,6 +522,12 @@ int gw_read_client_event( if (rc < 0) { + if (dcb->session != NULL) + { + spinlock_acquire(&dcb->session->ses_lock); + dcb->session->state = SESSION_STATE_STOPPING; + spinlock_release(&dcb->session->ses_lock); + } dcb_close(dcb); } nbytes_read = gwbuf_length(read_buffer); @@ -631,11 +637,8 @@ int gw_read_client_event( 2, 0, "failed to create new session"); -#if defined(ERRHANDLE) + dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif } } else @@ -655,11 +658,8 @@ int gw_read_client_event( 2, 0, "Authorization failed"); -#if defined(ERRHANDLE) + dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif } } break; @@ -710,15 +710,17 @@ int gw_read_client_event( "client dcb %p.", pthread_self(), dcb))); -#if defined(ERRHANDLE) /** * close router session and that closes * backends */ + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); -#else - (dcb->func).close(dcb); -#endif } else { @@ -769,7 +771,6 @@ int gw_read_client_event( /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { -#if defined(ERRHANDLE) /** * Sends COM_QUIT packets since buffer is already * created. A BREF_CLOSED flag is set so dcb_close won't @@ -779,18 +780,11 @@ int gw_read_client_event( /** * Close router session which causes closing of backends. */ + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + dcb_close(dcb); -#else - SESSION_ROUTE_QUERY(session, read_buffer); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_client_event] Routed COM_QUIT to " - "backend. Close client dcb %p", - pthread_self(), - dcb))); - /** close client connection, closes router session too */ - rc = dcb->func.close(dcb); -#endif } else { @@ -818,45 +812,36 @@ int gw_read_client_event( if (rc == 1) { rc = 0; /**< here '0' means success */ } else { -#if defined(ERRHANDLE) - bool succp; + GWBUF* errbuf; + bool succp; + + errbuf = mysql_create_custom_error( + 1, + 0, + "Write to backend failed. Session closed."); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Routing the query failed. " - "Reselecting backends."))); + "Session will be closed."))); router->handleError(router_instance, rsession, - "Write to backend failed.", + errbuf, dcb, - ERRACT_NEW_CONNECTION, + ERRACT_REPLY_CLIENT, &succp); if (!succp) { - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Reselecting backend " - "servers failed."))); - + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); } - - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "Reselected backend servers."))); - -#else - mysql_send_custom_error(dcb, - 1, - 0, - "Query routing failed. " - "Connection to backend " - "lost."); - protocol->state = MYSQL_IDLE; -#endif } } goto return_rc; @@ -1314,17 +1299,22 @@ return_rc: static int gw_error_client_event( DCB* dcb) - { +{ int rc; + SESSION* session; CHK_DCB(dcb); -#if defined(ERRHANDLE) + session = dcb->session; + CHK_SESSION(session); + + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); return 1; -#else - rc = dcb->func.close(dcb); - return rc; -#endif } static int @@ -1361,10 +1351,6 @@ gw_client_close(DCB *dcb) router->closeSession(router_instance, rsession); } -#if !defined(ERRHANDLE) - /** close client DCB */ - dcb_close(dcb); -#endif return 1; } @@ -1379,17 +1365,21 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - int rc; + int rc; + SESSION* session; CHK_DCB(dcb); -#if defined(ERRHANDLE) + session = dcb->session; + CHK_SESSION(session); + + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); return 1; -#else - rc = dcb->func.close(dcb); - - return rc; -#endif } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 4ab8ad3ea..b546a0a88 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -868,6 +868,78 @@ int mysql_send_com_quit( } +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg) +{ + uint8_t* outbuf = NULL; + uint8_t mysql_payload_size = 0; + uint8_t mysql_packet_header[4]; + uint8_t* mysql_payload = NULL; + uint8_t field_count = 0; + uint8_t mysql_err[2]; + uint8_t mysql_statemsg[6]; + unsigned int mysql_errno = 0; + const char* mysql_error_msg = NULL; + const char* mysql_state = NULL; + + GWBUF* errbuf = NULL; + + mysql_errno = 2003; + mysql_error_msg = "An errorr occurred ..."; + mysql_state = "HY000"; + + field_count = 0xff; + gw_mysql_set_byte2(mysql_err, mysql_errno); + mysql_statemsg[0]='#'; + memcpy(mysql_statemsg+1, mysql_state, 5); + + if (msg != NULL) { + mysql_error_msg = msg; + } + + mysql_payload_size = sizeof(field_count) + + sizeof(mysql_err) + + sizeof(mysql_statemsg) + + strlen(mysql_error_msg); + + /** allocate memory for packet header + payload */ + errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); + ss_dassert(errbuf != NULL); + + if (errbuf == NULL) + { + return 0; + } + outbuf = GWBUF_DATA(errbuf); + + /** write packet header and packet number */ + gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); + mysql_packet_header[3] = packet_number; + + /** write header */ + memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); + + mysql_payload = outbuf + sizeof(mysql_packet_header); + + /** write field */ + memcpy(mysql_payload, &field_count, sizeof(field_count)); + mysql_payload = mysql_payload + sizeof(field_count); + + /** write errno */ + memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); + mysql_payload = mysql_payload + sizeof(mysql_err); + + /** write sqlstate */ + memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); + mysql_payload = mysql_payload + sizeof(mysql_statemsg); + + /** write error message */ + memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); + + return errbuf; +} /** * mysql_send_custom_error * @@ -881,79 +953,21 @@ int mysql_send_com_quit( * @return packet length * */ -int -mysql_send_custom_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { - uint8_t *outbuf = NULL; - uint8_t mysql_payload_size = 0; - uint8_t mysql_packet_header[4]; - uint8_t *mysql_payload = NULL; - uint8_t field_count = 0; - uint8_t mysql_err[2]; - uint8_t mysql_statemsg[6]; - unsigned int mysql_errno = 0; - const char *mysql_error_msg = NULL; - const char *mysql_state = NULL; +int mysql_send_custom_error ( + DCB *dcb, + int packet_number, + int in_affected_rows, + const char *mysql_message) +{ + GWBUF* buf; + int nbytes; - GWBUF *buf = NULL; - - if (dcb == NULL || - dcb->state != DCB_STATE_POLLING) - { - return 0; - } - mysql_errno = 2003; - mysql_error_msg = "An errorr occurred ..."; - mysql_state = "HY000"; - - field_count = 0xff; - gw_mysql_set_byte2(mysql_err, mysql_errno); - mysql_statemsg[0]='#'; - memcpy(mysql_statemsg+1, mysql_state, 5); - - if (mysql_message != NULL) { - mysql_error_msg = mysql_message; - } - - mysql_payload_size = sizeof(field_count) + sizeof(mysql_err) + sizeof(mysql_statemsg) + strlen(mysql_error_msg); - - // allocate memory for packet header + payload - buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); - ss_dassert(buf != NULL); + buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message); - if (buf == NULL) - { - return 0; - } - outbuf = GWBUF_DATA(buf); - - // write packet header with packet number - gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); - mysql_packet_header[3] = packet_number; - - // write header - memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); - - mysql_payload = outbuf + sizeof(mysql_packet_header); - - // write field - memcpy(mysql_payload, &field_count, sizeof(field_count)); - mysql_payload = mysql_payload + sizeof(field_count); - - // write errno - memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); - mysql_payload = mysql_payload + sizeof(mysql_err); - - // write sqlstate - memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); - mysql_payload = mysql_payload + sizeof(mysql_statemsg); - - // write err messg - memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); - - // writing data in the Client buffer queue + nbytes = GWBUF_LENGTH(buf); dcb->func.write(dcb, buf); - return sizeof(mysql_packet_header) + mysql_payload_size; + return GWBUF_LENGTH(buf); } /** diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index 86e98f397..e8e8ec7c0 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -343,8 +343,7 @@ TELNETD *telnetd = dcb->protocol; if (telnetd && telnetd->username) free(telnetd->username); - dcb_close(dcb); - return 0; + return 0; } /** diff --git a/server/modules/routing/GaleraHACRoute.c b/server/modules/routing/GaleraHACRoute.c index 4f143143d..33a9597cc 100644 --- a/server/modules/routing/GaleraHACRoute.c +++ b/server/modules/routing/GaleraHACRoute.c @@ -491,7 +491,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index bf2404c08..e58a25163 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -295,7 +295,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; if (execute_cmd(session)) dcb_printf(session->session->client, "MaxScale> "); else - session->session->client->func.close(session->session->client); + dcb_close(session->session->client); } return 1; } diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 8ca0d0b78..bc6af395a 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -552,7 +552,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index f18f0ac18..9e7ef2cc0 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -82,13 +82,13 @@ static void clientReply( GWBUF* queue, DCB* backend_dcb); -static void handleError( - ROUTER* instance, - void* router_session, - char* message, - DCB* backend_dcb, - int action, - bool* succp); +static void handleError( + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static int router_get_servercount(ROUTER_INSTANCE* router); @@ -179,9 +179,15 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); +static bool execute_sescmd_history(backend_ref_t* bref); + static bool execute_sescmd_in_backend( backend_ref_t* backend_ref); +static void sescmd_cursor_reset(sescmd_cursor_t* scur); + +static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur); + static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, bool value); @@ -581,7 +587,10 @@ static void* newSession( /** * Find a backend servers to connect to. + * This command requires that rsession's lock is held. */ + rses_begin_locked_router_action(client_rses); + succp = select_connect_backend_servers(&master_ref, backend_ref, router_nservers, @@ -589,6 +598,8 @@ static void* newSession( client_rses->rses_config.rw_slave_select_criteria, session, router); + + rses_end_locked_router_action(client_rses); /** Both Master and at least 1 slave must be found */ if (!succp) { @@ -689,14 +700,10 @@ static void closeSession( CHK_DCB(dcb); bref_clear_state(&backend_ref[i], BREF_IN_USE); bref_set_state(&backend_ref[i], BREF_CLOSED); -#if defined(ERRHANDLE) /** * closes protocol and dcb */ dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif /** decrease server current connection counters */ atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); @@ -868,6 +875,12 @@ return_succp: * @param queue Gateway buffer queue with the packets received * * @return if succeed 1, otherwise 0 + * If routeQuery fails, it means that router session has failed. + * In any tolerated failure, handleError is called and if necessary, + * an error message is sent to the client. + * + * For now, routeQuery don't tolerate errors, so any error will close + * the session. vraa 14.6.14 */ static int routeQuery( ROUTER* instance, @@ -1466,12 +1479,27 @@ static bool select_connect_backend_servers( if (*p_master_ref != NULL && BREF_IS_IN_USE((*p_master_ref))) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Master %p fd %d found.", + pthread_self(), + (*p_master_ref)->bref_dcb, + (*p_master_ref)->bref_dcb->fd))); + master_found = true; master_connected = true; } /** New session or master failure case */ else { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Didn't find master ", + "for session %p rses %p.", + pthread_self(), + session, + backend_ref))); + master_found = false; master_connected = false; } @@ -1598,6 +1626,15 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { slaves_connected += 1; + /** + * Start executing session command + * history. + */ + execute_sescmd_history(&backend_ref[i]); + /** + * Callback which is called when + * node fails. + */ dcb_add_callback( backend_ref[i].bref_dcb, DCB_REASON_NOT_RESPONDING, @@ -1641,6 +1678,13 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { master_connected = true; + + dcb_add_callback( + backend_ref[i].bref_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)&backend_ref[i]); + bref_clear_state(&backend_ref[i], BREF_NOT_USED); bref_set_state(&backend_ref[i], @@ -2131,6 +2175,61 @@ static GWBUF* sescmd_cursor_clone_querybuf( return buf; } +static bool sescmd_cursor_history_empty( + sescmd_cursor_t* scur) +{ + bool succp; + + CHK_SESCMD_CUR(scur); + + if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) + { + succp = true; + } + else + { + succp = false; + } + + return succp; +} + + +static void sescmd_cursor_reset( + sescmd_cursor_t* scur) +{ + CHK_SESCMD_CUR(scur); + CHK_CLIENT_RSES(scur->scmd_cur_rses); + + CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); + + scur->scmd_cur_active = false; + scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; +} + +static bool execute_sescmd_history( + backend_ref_t* bref) +{ + bool succp; + sescmd_cursor_t* scur; + CHK_BACKEND_REF(bref); + + scur = &bref->bref_sescmd_cur; + CHK_SESCMD_CUR(scur); + + if (!sescmd_cursor_history_empty(scur)) + { + sescmd_cursor_reset(scur); + succp = execute_sescmd_in_backend(bref); + } + else + { + succp = true; + } + + return succp; +} + /** * If session command cursor is passive, sends the command to backend for * execution. @@ -2540,9 +2639,9 @@ static void rwsplit_process_options( } /** - * Error Handler routine - * - * The routine will handle errors that occurred in backend writes. + * Error Handler routine to resolve backend failures. If it succeeds then there + * are enough operative backends available and connected. Otherwise it fails, + * and session is terminated. * * @param instance The router instance * @param router_session The router session @@ -2555,27 +2654,74 @@ static void rwsplit_process_options( * tell whether router has enough master/slave connections to continue work. */ static void handleError ( - ROUTER *instance, - void *router_session, - char *message, - DCB *backend_dcb, - int action, - bool *succp) + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp) { - DCB* client_dcb = NULL; - SESSION* session = backend_dcb->session; + DCB* client_dcb; + SESSION* session; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; - client_dcb = session->client; - CHK_DCB(client_dcb); + CHK_DCB(backend_dcb); + backend_dcb->dcb_errhandle_called = true; + session = backend_dcb->session; + CHK_SESSION(session); switch (action) { case ERRACT_NEW_CONNECTION: { - int router_nservers; - int max_nslaves; + int router_nservers; + int max_nslaves; + backend_ref_t* bref; + CHK_CLIENT_RSES(rses); + + if (!rses_begin_locked_router_action(rses)) + { + *succp = false; + return; + } + + /** + * Error handler is already called for this DCB because + * it's not polling anymore. It can be assumed that + * it succeed because rses isn't closed. + */ + if (backend_dcb->state != DCB_STATE_POLLING) + { + rses_end_locked_router_action(rses); + *succp = true; + return; + } + + + bref = get_bref_from_dcb(rses, backend_dcb); + CHK_BACKEND_REF(bref); + + if (BREF_IS_WAITING_RESULT(bref)) + { + DCB* client_dcb; + client_dcb = session->client; + client_dcb->func.write(client_dcb, errmsgbuf); + } + bref_clear_state(bref, BREF_IN_USE); + bref_clear_state(bref, BREF_WAITING_RESULT); + bref_set_state(bref, BREF_NOT_USED); + bref_set_state(bref, BREF_CLOSED); + /** + * Remove callback because this DCB won't be used + * unless it is reconnected later, and then the callback + * is set again. + */ + dcb_remove_callback(backend_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)bref); + router_nservers = router_get_servercount(inst); max_nslaves = rses_get_max_slavecount(rses, router_nservers); /** @@ -2590,45 +2736,33 @@ static void handleError ( rses->rses_config.rw_slave_select_criteria, session, inst); - - ss_dassert(*succp); - /** Too few or no slaves at all */ - if (!succp) - { - if (session->state == SESSION_STATE_ROUTER_READY) - { - ROUTER* rsession; - ROUTER_OBJECT* router; - - router = session->service->router; - - spinlock_acquire(&session->ses_lock); - session->state = SESSION_STATE_STOPPING; - spinlock_release(&session->ses_lock); - - rsession = session->router_session; - /*< - * rsession should never be NULL here. - */ - ss_dassert(rsession != NULL); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()))); - - router->closeSession(instance, rsession); - } - - } - } + + rses_end_locked_router_action(rses); break; + } + + case ERRACT_REPLY_CLIENT: + { + session_state_t sesstate; + spinlock_acquire(&session->ses_lock); + sesstate = session->state; + client_dcb = session->client; + spinlock_release(&session->ses_lock); + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + client_dcb->func.write(client_dcb, errmsgbuf); + } + succp = false; /** false because new servers aren's selected. */ + break; + } + default: *succp = false; break; - } + } } static void print_error_packet( @@ -2727,19 +2861,25 @@ static backend_ref_t* get_bref_from_dcb( DCB* dcb) { backend_ref_t* bref; - + int i = 0; CHK_DCB(dcb); CHK_CLIENT_RSES(rses); bref = rses->rses_backend_ref; - while (bref != NULL) + while (irses_nbackends) { if (bref->bref_dcb == dcb) { break; } bref++; + i += 1; + } + + if (i == rses->rses_nbackends) + { + bref = NULL; } return bref; } @@ -2749,30 +2889,37 @@ static int router_handle_state_switch( DCB_REASON reason, void* data) { - backend_ref_t* bref; - int rc = 1; + backend_ref_t* bref; + int rc = 1; + ROUTER_CLIENT_SES* rses; + SESSION* ses; + SERVER* srv; CHK_DCB(dcb); bref = (backend_ref_t *)data; CHK_BACKEND_REF(bref); - if (bref->bref_dcb != dcb) + srv = bref->bref_backend->backend_server; + + if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv)) { goto return_rc; } + ses = dcb->session; + CHK_SESSION(ses); + + rses = (ROUTER_CLIENT_SES *)dcb->session->router_session; + CHK_CLIENT_RSES(rses); switch (reason) { case DCB_REASON_NOT_RESPONDING: - if (BREF_IS_WAITING_RESULT(bref)) - { - dcb->func.hangup(dcb); - } + dcb->func.hangup(dcb); break; default: break; } + return_rc: return rc; -} - +} \ No newline at end of file