diff --git a/server/core/dcb.c b/server/core/dcb.c index fe5b29590..fd96361c4 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -110,6 +110,7 @@ DCB *rval; #if defined(SS_DEBUG) rval->dcb_chk_top = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB; + rval->dcb_errhandle_called = false; #endif rval->dcb_role = role; #if 1 @@ -149,7 +150,7 @@ DCB *rval; /** - * Free a DCB that has not been associated with a decriptor. + * Free a DCB that has not been associated with a descriptor. * * @param dcb The DCB to free */ @@ -957,7 +958,6 @@ int above_water; if (dcb->writeq) { int len; - /* * Loop over the buffer chain in the pending writeq * Send as much of the data in that chain as possible and @@ -1042,9 +1042,7 @@ void dcb_close(DCB *dcb) { int rc; -#if defined(ERRHANDLE) - bool isclient; -#endif + CHK_DCB(dcb); /*< @@ -1062,21 +1060,13 @@ dcb_close(DCB *dcb) dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); - - -#if defined(ERRHANDLE) - isclient = dcb_isclient(dcb); + /*< + * Stop dcb's listening and modify state accordingly. + */ + rc = poll_remove_dcb(dcb); - if (isclient) - { - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); - } /** * close protocol and router session */ @@ -1085,26 +1075,6 @@ dcb_close(DCB *dcb) dcb->func.close(dcb); } - if (!isclient) - { - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); - } -#else - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); -#endif - dcb_call_callback(dcb, DCB_REASON_CLOSE); if (rc == 0) { @@ -1654,7 +1624,7 @@ int rval = 0; if (cb->reason == reason && cb->cb == callback && cb->userdata == userdata) { - if (pcb == NULL) + if (pcb != NULL) pcb->next = cb->next; else dcb->callbacks = cb->next; diff --git a/server/include/dcb.h b/server/include/dcb.h index dfc97ccce..405fb16f1 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -195,6 +195,7 @@ typedef struct dcb_callback { typedef struct dcb { #if defined(SS_DEBUG) skygw_chk_t dcb_chk_top; + bool dcb_errhandle_called; #endif dcb_role_t dcb_role; SPINLOCK dcb_initlock; diff --git a/server/include/router.h b/server/include/router.h index 14302d230..8f0851091 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -66,6 +66,12 @@ typedef void *ROUTER; * * @see load_module */ +typedef enum error_action { + ERRACT_NEW_CONNECTION = 0x001, + ERRACT_REPLY_CLIENT = 0x002 +} error_action_t; + + typedef struct router_object { ROUTER *(*createInstance)(SERVICE *service, char **options); void *(*newSession)(ROUTER *instance, SESSION *session); @@ -75,12 +81,12 @@ typedef struct router_object { void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); void (*handleError)( - ROUTER* instance, - void* router_session, - char* message, - DCB *backend_dcb, - int action, - bool* succp); + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; @@ -97,10 +103,6 @@ typedef enum router_capability_t { RCAP_TYPE_PACKET_INPUT = (1 << 1) } router_capability_t; -typedef enum error_action { - ERRACT_NEW_CONNECTION = 0x001, - ERRACT_REPLY_CLIENT = 0x002 -} error_action_t; #endif diff --git a/server/include/server.h b/server/include/server.h index b15453c18..d32413bb7 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -117,6 +117,11 @@ typedef struct server { */ #define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) +/** server is not master, slave or joined */ +#define SERVER_NOT_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == 0) + +#define SERVER_IS_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) != 0) + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); diff --git a/server/include/session.h b/server/include/session.h index f99982802..a2415c785 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -57,7 +57,7 @@ typedef enum { SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */ - SESSION_STATE_STOPPING, /*< router is being closed */ + SESSION_STATE_STOPPING, /*< session and router are being closed */ SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_FREE /*< for all sessions */ diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 5f5d30735..bb7338a3e 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -265,6 +265,12 @@ int mysql_send_custom_error ( int packet_number, int in_affected_rows, const char* mysql_message); + +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg); + int gw_send_change_user_to_backend( char *dbname, char *user, diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index f522e7006..2fdc6a97e 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -144,6 +144,7 @@ MYSQL_MONITOR *handle; handle->defaultPasswd = NULL; handle->id = MONITOR_DEFAULT_ID; handle->interval = MONITOR_INTERVAL; + handle->replicationHeartbeat = 0; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index 7d06264b9..7db1366ad 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -245,7 +245,7 @@ HTTPD_session *client_data = NULL; } /* force the client connecton close */ - dcb->func.close(dcb); + dcb_close(dcb); return 0; } @@ -359,7 +359,6 @@ int n_connect = 0; static int httpd_close(DCB *dcb) { - dcb_close(dcb); return 0; } diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 588443c63..ad5a7ac20 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -371,23 +371,11 @@ static int gw_read_backend_event(DCB *dcb) { spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); - - /** - * rsession shouldn't be NULL since session - * state indicates that it was initialized - * successfully. + /** + * Start terminating the session + * by closing the client. */ - rsession = session->router_session; - ss_dassert(rsession != NULL); - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_backend_event] " - "Call closeSession for backend's " - "router client session.", - pthread_self()))); - /* close router_session */ - router->closeSession(router_instance, rsession); + dcb_close(session->client); rc = 1; goto return_rc; } @@ -432,35 +420,41 @@ static int gw_read_backend_event(DCB *dcb) { /* read available backend data */ rc = dcb_read(dcb, &writebuf); - if (rc < 0) { + if (rc < 0) + { /*< vraa : errorHandle */ /*< * Backend generated EPOLLIN event and if backend has * failed, connection must be closed to avoid backend * dcb from getting hanged. */ -#if defined(ERRHANDLE) - bool succp; + GWBUF* errbuf; + bool succp; /** * - send error for client * - mark failed backend BREF_NOT_USED * - go through all servers and select one according to * the criteria that user specified in the beginning. */ + errbuf = mysql_create_custom_error( + 1, + 0, + "Read from backend failed"); + router->handleError(router_instance, rsession, - "Read from backend failed", + errbuf, dcb, ERRACT_NEW_CONNECTION, &succp); - + if (!succp) { - dcb_close(dcb); + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); } -#else - (dcb->func).close(dcb); -#endif + dcb_close(dcb); rc = 0; goto return_rc; } @@ -469,15 +463,6 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - - /* Note the gwbuf doesn't have here a valid queue->command - * descriptions as it is a fresh new one! - * We only have the copied value in dcb->command from - * previuos func.write() and this will be used by the - * router->clientReply - * and pass now the gwbuf to the router - */ - /*< * If dcb->session->client is freed already it may be NULL. */ @@ -598,33 +583,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) MySQLProtocol *backend_protocol = dcb->protocol; int rc = 0; - ss_dassert(dcb->state == DCB_STATE_POLLING); -#if !defined(ERRHANDLE) - /*< - * Don't write to backend if backend_dcb is not in poll set anymore. - */ - spinlock_acquire(&dcb->dcb_initlock); - - if (dcb->state != DCB_STATE_POLLING) - { - /*< vraa : errorHandle */ - /*< Free buffer memory */ - gwbuf_consume(queue, GWBUF_LENGTH(queue)); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [gw_MySQLWrite_backend] Write to backend failed. " - "Backend dcb %p fd %d is %s.", - pthread_self(), - dcb, - dcb->fd, - STRDCBSTATE(dcb->state)))); - spinlock_release(&dcb->dcb_initlock); - rc = 0; - goto return_rc; - } - spinlock_release(&dcb->dcb_initlock); -#endif spinlock_acquire(&dcb->authlock); /** * Pick action according to state of protocol. @@ -697,88 +655,50 @@ return_rc: } /** - * Backend Error Handling for EPOLLER - * + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. */ static int gw_error_backend_event(DCB *dcb) { - SESSION *session; - void *rsession; - ROUTER_OBJECT *router; - ROUTER *router_instance; - int rc = 0; - + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + int rc = 0; + GWBUF* errbuf; + bool succp; + CHK_DCB(dcb); session = dcb->session; CHK_SESSION(session); + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; - router = session->service->router; - router_instance = session->service->router_instance; + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); -#if defined(ERRHANDLE2) router->handleError(router_instance, - rsession, - "Connection to backend server failed", - dcb, - ERRACT_NEW_CONNECTION, - &succp); - - if (!succp) - { - dcb_close(dcb); - } -#else - if (dcb->state != DCB_STATE_POLLING) { - /*< vraa : errorHandle */ - /*< - * if client is not available it needs to be handled in send - * function. Session != NULL, that is known. - */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Writing to backend failed."); - - rc = 0; - } else { - /*< vraa : errorHandle */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Closed backend connection."); - rc = 1; - } - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] Some error occurred in backend. " - "rc = %d", - pthread_self(), - rc))); - - if (session->state == SESSION_STATE_ROUTER_READY) - { + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + /** There are not required backends available, close session. */ + if (!succp) { spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); - - rsession = session->router_session; - /*< - * rsession should never be NULL here. - */ - ss_dassert(rsession != NULL); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()))); - - router->closeSession(router_instance, rsession); } -#endif - return rc; + dcb_close(dcb); + + return 1; } /* @@ -879,7 +799,11 @@ return_fd: /** - * Hangup routine the backend dcb: it does nothing + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. * * @param dcb The current Backend DCB * @return 1 always @@ -893,6 +817,7 @@ gw_backend_hangup(DCB *dcb) ROUTER* router_instance; int rc = 0; bool succp; + GWBUF* errbuf; CHK_DCB(dcb); session = dcb->session; @@ -901,42 +826,26 @@ gw_backend_hangup(DCB *dcb) router = session->service->router; router_instance = session->service->router_instance; - mysql_send_custom_error( - dcb->session->client, - 1, - 0, + errbuf = mysql_create_custom_error( + 1, + 0, "Lost connection to backend server."); - - /** - * errorHandle : - * - sulje katkennut yhteys - miten? - * - etsi riittävä määrä servereitä - * - jos epäonnistui, sammuta sessio - * - jos onnistui, jatka - * - * Jos sammutetaan : - * - dcb_close - backend->func.close() - */ - - /*< vraa : errorHandle */ - /* - * - - lähetä virheviesti clientille jos odottaa - errorHandle : - - etsi riittävä määrä servereitä - - jos epäonnistui, sammuta sessio - - jos onnistui, jatka - */ - router->handleError(router_instance, - rsession, - "Lost connection to backend server", + + router->handleError(router_instance, + rsession, + errbuf, dcb, ERRACT_NEW_CONNECTION, &succp); - if (succp) { - dcb_close(dcb); + /** There are not required backends available, close session. */ + if (!succp) { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); } + dcb_close(dcb); + return 1; } @@ -948,7 +857,6 @@ gw_backend_hangup(DCB *dcb) static int gw_backend_close(DCB *dcb) { -#if defined(ERRHANDLE) DCB* client_dcb; SESSION* session; GWBUF* quitbuf; @@ -957,15 +865,13 @@ gw_backend_close(DCB *dcb) CHK_DCB(dcb); session = dcb->session; CHK_SESSION(session); - + quitbuf = mysql_create_com_quit(NULL, 0); - + /** Send COM_QUIT to the backend being closed */ mysql_send_com_quit(dcb, 0, quitbuf); - if (session != NULL && - (session->state == SESSION_STATE_ROUTER_READY || - session->state == SESSION_STATE_READY)) + if (session != NULL && session->state == SESSION_STATE_STOPPING) { client_dcb = session->client; @@ -976,9 +882,6 @@ gw_backend_close(DCB *dcb) dcb_close(client_dcb); } } -#else - dcb_close(dcb); -#endif return 1; } @@ -1048,6 +951,8 @@ static int backend_write_delayqueue(DCB *dcb) "Failed to write buffered data to back-end server. " "Buffer was empty or back-end was disconnected during " "operation."); + + dcb->session->state = SESSION_STATE_STOPPING; dcb_close(dcb); } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 9c6955f7d..5f6c7be0f 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -522,6 +522,12 @@ int gw_read_client_event( if (rc < 0) { + if (dcb->session != NULL) + { + spinlock_acquire(&dcb->session->ses_lock); + dcb->session->state = SESSION_STATE_STOPPING; + spinlock_release(&dcb->session->ses_lock); + } dcb_close(dcb); } nbytes_read = gwbuf_length(read_buffer); @@ -631,11 +637,8 @@ int gw_read_client_event( 2, 0, "failed to create new session"); -#if defined(ERRHANDLE) + dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif } } else @@ -655,11 +658,8 @@ int gw_read_client_event( 2, 0, "Authorization failed"); -#if defined(ERRHANDLE) + dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif } } break; @@ -710,15 +710,17 @@ int gw_read_client_event( "client dcb %p.", pthread_self(), dcb))); -#if defined(ERRHANDLE) /** * close router session and that closes * backends */ + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); -#else - (dcb->func).close(dcb); -#endif } else { @@ -769,7 +771,6 @@ int gw_read_client_event( /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { -#if defined(ERRHANDLE) /** * Sends COM_QUIT packets since buffer is already * created. A BREF_CLOSED flag is set so dcb_close won't @@ -779,18 +780,11 @@ int gw_read_client_event( /** * Close router session which causes closing of backends. */ + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + dcb_close(dcb); -#else - SESSION_ROUTE_QUERY(session, read_buffer); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_client_event] Routed COM_QUIT to " - "backend. Close client dcb %p", - pthread_self(), - dcb))); - /** close client connection, closes router session too */ - rc = dcb->func.close(dcb); -#endif } else { @@ -818,45 +812,36 @@ int gw_read_client_event( if (rc == 1) { rc = 0; /**< here '0' means success */ } else { -#if defined(ERRHANDLE) - bool succp; + GWBUF* errbuf; + bool succp; + + errbuf = mysql_create_custom_error( + 1, + 0, + "Write to backend failed. Session closed."); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Routing the query failed. " - "Reselecting backends."))); + "Session will be closed."))); router->handleError(router_instance, rsession, - "Write to backend failed.", + errbuf, dcb, - ERRACT_NEW_CONNECTION, + ERRACT_REPLY_CLIENT, &succp); if (!succp) { - - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Reselecting backend " - "servers failed."))); - + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); } - - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "Reselected backend servers."))); - -#else - mysql_send_custom_error(dcb, - 1, - 0, - "Query routing failed. " - "Connection to backend " - "lost."); - protocol->state = MYSQL_IDLE; -#endif } } goto return_rc; @@ -1314,17 +1299,22 @@ return_rc: static int gw_error_client_event( DCB* dcb) - { +{ int rc; + SESSION* session; CHK_DCB(dcb); -#if defined(ERRHANDLE) + session = dcb->session; + CHK_SESSION(session); + + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); return 1; -#else - rc = dcb->func.close(dcb); - return rc; -#endif } static int @@ -1361,10 +1351,6 @@ gw_client_close(DCB *dcb) router->closeSession(router_instance, rsession); } -#if !defined(ERRHANDLE) - /** close client DCB */ - dcb_close(dcb); -#endif return 1; } @@ -1379,17 +1365,21 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - int rc; + int rc; + SESSION* session; CHK_DCB(dcb); -#if defined(ERRHANDLE) + session = dcb->session; + CHK_SESSION(session); + + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } dcb_close(dcb); return 1; -#else - rc = dcb->func.close(dcb); - - return rc; -#endif } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 4ab8ad3ea..b546a0a88 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -868,6 +868,78 @@ int mysql_send_com_quit( } +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg) +{ + uint8_t* outbuf = NULL; + uint8_t mysql_payload_size = 0; + uint8_t mysql_packet_header[4]; + uint8_t* mysql_payload = NULL; + uint8_t field_count = 0; + uint8_t mysql_err[2]; + uint8_t mysql_statemsg[6]; + unsigned int mysql_errno = 0; + const char* mysql_error_msg = NULL; + const char* mysql_state = NULL; + + GWBUF* errbuf = NULL; + + mysql_errno = 2003; + mysql_error_msg = "An errorr occurred ..."; + mysql_state = "HY000"; + + field_count = 0xff; + gw_mysql_set_byte2(mysql_err, mysql_errno); + mysql_statemsg[0]='#'; + memcpy(mysql_statemsg+1, mysql_state, 5); + + if (msg != NULL) { + mysql_error_msg = msg; + } + + mysql_payload_size = sizeof(field_count) + + sizeof(mysql_err) + + sizeof(mysql_statemsg) + + strlen(mysql_error_msg); + + /** allocate memory for packet header + payload */ + errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); + ss_dassert(errbuf != NULL); + + if (errbuf == NULL) + { + return 0; + } + outbuf = GWBUF_DATA(errbuf); + + /** write packet header and packet number */ + gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); + mysql_packet_header[3] = packet_number; + + /** write header */ + memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); + + mysql_payload = outbuf + sizeof(mysql_packet_header); + + /** write field */ + memcpy(mysql_payload, &field_count, sizeof(field_count)); + mysql_payload = mysql_payload + sizeof(field_count); + + /** write errno */ + memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); + mysql_payload = mysql_payload + sizeof(mysql_err); + + /** write sqlstate */ + memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); + mysql_payload = mysql_payload + sizeof(mysql_statemsg); + + /** write error message */ + memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); + + return errbuf; +} /** * mysql_send_custom_error * @@ -881,79 +953,21 @@ int mysql_send_com_quit( * @return packet length * */ -int -mysql_send_custom_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { - uint8_t *outbuf = NULL; - uint8_t mysql_payload_size = 0; - uint8_t mysql_packet_header[4]; - uint8_t *mysql_payload = NULL; - uint8_t field_count = 0; - uint8_t mysql_err[2]; - uint8_t mysql_statemsg[6]; - unsigned int mysql_errno = 0; - const char *mysql_error_msg = NULL; - const char *mysql_state = NULL; +int mysql_send_custom_error ( + DCB *dcb, + int packet_number, + int in_affected_rows, + const char *mysql_message) +{ + GWBUF* buf; + int nbytes; - GWBUF *buf = NULL; - - if (dcb == NULL || - dcb->state != DCB_STATE_POLLING) - { - return 0; - } - mysql_errno = 2003; - mysql_error_msg = "An errorr occurred ..."; - mysql_state = "HY000"; - - field_count = 0xff; - gw_mysql_set_byte2(mysql_err, mysql_errno); - mysql_statemsg[0]='#'; - memcpy(mysql_statemsg+1, mysql_state, 5); - - if (mysql_message != NULL) { - mysql_error_msg = mysql_message; - } - - mysql_payload_size = sizeof(field_count) + sizeof(mysql_err) + sizeof(mysql_statemsg) + strlen(mysql_error_msg); - - // allocate memory for packet header + payload - buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); - ss_dassert(buf != NULL); + buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message); - if (buf == NULL) - { - return 0; - } - outbuf = GWBUF_DATA(buf); - - // write packet header with packet number - gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); - mysql_packet_header[3] = packet_number; - - // write header - memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); - - mysql_payload = outbuf + sizeof(mysql_packet_header); - - // write field - memcpy(mysql_payload, &field_count, sizeof(field_count)); - mysql_payload = mysql_payload + sizeof(field_count); - - // write errno - memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); - mysql_payload = mysql_payload + sizeof(mysql_err); - - // write sqlstate - memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); - mysql_payload = mysql_payload + sizeof(mysql_statemsg); - - // write err messg - memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); - - // writing data in the Client buffer queue + nbytes = GWBUF_LENGTH(buf); dcb->func.write(dcb, buf); - return sizeof(mysql_packet_header) + mysql_payload_size; + return GWBUF_LENGTH(buf); } /** diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index 86e98f397..e8e8ec7c0 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -343,8 +343,7 @@ TELNETD *telnetd = dcb->protocol; if (telnetd && telnetd->username) free(telnetd->username); - dcb_close(dcb); - return 0; + return 0; } /** diff --git a/server/modules/routing/GaleraHACRoute.c b/server/modules/routing/GaleraHACRoute.c index 4f143143d..33a9597cc 100644 --- a/server/modules/routing/GaleraHACRoute.c +++ b/server/modules/routing/GaleraHACRoute.c @@ -491,7 +491,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index bf2404c08..e58a25163 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -295,7 +295,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; if (execute_cmd(session)) dcb_printf(session->session->client, "MaxScale> "); else - session->session->client->func.close(session->session->client); + dcb_close(session->session->client); } return 1; } diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 8ca0d0b78..bc6af395a 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -552,7 +552,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index f18f0ac18..9e7ef2cc0 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -82,13 +82,13 @@ static void clientReply( GWBUF* queue, DCB* backend_dcb); -static void handleError( - ROUTER* instance, - void* router_session, - char* message, - DCB* backend_dcb, - int action, - bool* succp); +static void handleError( + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); static int router_get_servercount(ROUTER_INSTANCE* router); @@ -179,9 +179,15 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); +static bool execute_sescmd_history(backend_ref_t* bref); + static bool execute_sescmd_in_backend( backend_ref_t* backend_ref); +static void sescmd_cursor_reset(sescmd_cursor_t* scur); + +static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur); + static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, bool value); @@ -581,7 +587,10 @@ static void* newSession( /** * Find a backend servers to connect to. + * This command requires that rsession's lock is held. */ + rses_begin_locked_router_action(client_rses); + succp = select_connect_backend_servers(&master_ref, backend_ref, router_nservers, @@ -589,6 +598,8 @@ static void* newSession( client_rses->rses_config.rw_slave_select_criteria, session, router); + + rses_end_locked_router_action(client_rses); /** Both Master and at least 1 slave must be found */ if (!succp) { @@ -689,14 +700,10 @@ static void closeSession( CHK_DCB(dcb); bref_clear_state(&backend_ref[i], BREF_IN_USE); bref_set_state(&backend_ref[i], BREF_CLOSED); -#if defined(ERRHANDLE) /** * closes protocol and dcb */ dcb_close(dcb); -#else - dcb->func.close(dcb); -#endif /** decrease server current connection counters */ atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); @@ -868,6 +875,12 @@ return_succp: * @param queue Gateway buffer queue with the packets received * * @return if succeed 1, otherwise 0 + * If routeQuery fails, it means that router session has failed. + * In any tolerated failure, handleError is called and if necessary, + * an error message is sent to the client. + * + * For now, routeQuery don't tolerate errors, so any error will close + * the session. vraa 14.6.14 */ static int routeQuery( ROUTER* instance, @@ -1466,12 +1479,27 @@ static bool select_connect_backend_servers( if (*p_master_ref != NULL && BREF_IS_IN_USE((*p_master_ref))) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Master %p fd %d found.", + pthread_self(), + (*p_master_ref)->bref_dcb, + (*p_master_ref)->bref_dcb->fd))); + master_found = true; master_connected = true; } /** New session or master failure case */ else { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Didn't find master ", + "for session %p rses %p.", + pthread_self(), + session, + backend_ref))); + master_found = false; master_connected = false; } @@ -1598,6 +1626,15 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { slaves_connected += 1; + /** + * Start executing session command + * history. + */ + execute_sescmd_history(&backend_ref[i]); + /** + * Callback which is called when + * node fails. + */ dcb_add_callback( backend_ref[i].bref_dcb, DCB_REASON_NOT_RESPONDING, @@ -1641,6 +1678,13 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { master_connected = true; + + dcb_add_callback( + backend_ref[i].bref_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)&backend_ref[i]); + bref_clear_state(&backend_ref[i], BREF_NOT_USED); bref_set_state(&backend_ref[i], @@ -2131,6 +2175,61 @@ static GWBUF* sescmd_cursor_clone_querybuf( return buf; } +static bool sescmd_cursor_history_empty( + sescmd_cursor_t* scur) +{ + bool succp; + + CHK_SESCMD_CUR(scur); + + if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) + { + succp = true; + } + else + { + succp = false; + } + + return succp; +} + + +static void sescmd_cursor_reset( + sescmd_cursor_t* scur) +{ + CHK_SESCMD_CUR(scur); + CHK_CLIENT_RSES(scur->scmd_cur_rses); + + CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); + + scur->scmd_cur_active = false; + scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; +} + +static bool execute_sescmd_history( + backend_ref_t* bref) +{ + bool succp; + sescmd_cursor_t* scur; + CHK_BACKEND_REF(bref); + + scur = &bref->bref_sescmd_cur; + CHK_SESCMD_CUR(scur); + + if (!sescmd_cursor_history_empty(scur)) + { + sescmd_cursor_reset(scur); + succp = execute_sescmd_in_backend(bref); + } + else + { + succp = true; + } + + return succp; +} + /** * If session command cursor is passive, sends the command to backend for * execution. @@ -2540,9 +2639,9 @@ static void rwsplit_process_options( } /** - * Error Handler routine - * - * The routine will handle errors that occurred in backend writes. + * Error Handler routine to resolve backend failures. If it succeeds then there + * are enough operative backends available and connected. Otherwise it fails, + * and session is terminated. * * @param instance The router instance * @param router_session The router session @@ -2555,27 +2654,74 @@ static void rwsplit_process_options( * tell whether router has enough master/slave connections to continue work. */ static void handleError ( - ROUTER *instance, - void *router_session, - char *message, - DCB *backend_dcb, - int action, - bool *succp) + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp) { - DCB* client_dcb = NULL; - SESSION* session = backend_dcb->session; + DCB* client_dcb; + SESSION* session; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; - client_dcb = session->client; - CHK_DCB(client_dcb); + CHK_DCB(backend_dcb); + backend_dcb->dcb_errhandle_called = true; + session = backend_dcb->session; + CHK_SESSION(session); switch (action) { case ERRACT_NEW_CONNECTION: { - int router_nservers; - int max_nslaves; + int router_nservers; + int max_nslaves; + backend_ref_t* bref; + CHK_CLIENT_RSES(rses); + + if (!rses_begin_locked_router_action(rses)) + { + *succp = false; + return; + } + + /** + * Error handler is already called for this DCB because + * it's not polling anymore. It can be assumed that + * it succeed because rses isn't closed. + */ + if (backend_dcb->state != DCB_STATE_POLLING) + { + rses_end_locked_router_action(rses); + *succp = true; + return; + } + + + bref = get_bref_from_dcb(rses, backend_dcb); + CHK_BACKEND_REF(bref); + + if (BREF_IS_WAITING_RESULT(bref)) + { + DCB* client_dcb; + client_dcb = session->client; + client_dcb->func.write(client_dcb, errmsgbuf); + } + bref_clear_state(bref, BREF_IN_USE); + bref_clear_state(bref, BREF_WAITING_RESULT); + bref_set_state(bref, BREF_NOT_USED); + bref_set_state(bref, BREF_CLOSED); + /** + * Remove callback because this DCB won't be used + * unless it is reconnected later, and then the callback + * is set again. + */ + dcb_remove_callback(backend_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)bref); + router_nservers = router_get_servercount(inst); max_nslaves = rses_get_max_slavecount(rses, router_nservers); /** @@ -2590,45 +2736,33 @@ static void handleError ( rses->rses_config.rw_slave_select_criteria, session, inst); - - ss_dassert(*succp); - /** Too few or no slaves at all */ - if (!succp) - { - if (session->state == SESSION_STATE_ROUTER_READY) - { - ROUTER* rsession; - ROUTER_OBJECT* router; - - router = session->service->router; - - spinlock_acquire(&session->ses_lock); - session->state = SESSION_STATE_STOPPING; - spinlock_release(&session->ses_lock); - - rsession = session->router_session; - /*< - * rsession should never be NULL here. - */ - ss_dassert(rsession != NULL); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()))); - - router->closeSession(instance, rsession); - } - - } - } + + rses_end_locked_router_action(rses); break; + } + + case ERRACT_REPLY_CLIENT: + { + session_state_t sesstate; + spinlock_acquire(&session->ses_lock); + sesstate = session->state; + client_dcb = session->client; + spinlock_release(&session->ses_lock); + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + client_dcb->func.write(client_dcb, errmsgbuf); + } + succp = false; /** false because new servers aren's selected. */ + break; + } + default: *succp = false; break; - } + } } static void print_error_packet( @@ -2727,19 +2861,25 @@ static backend_ref_t* get_bref_from_dcb( DCB* dcb) { backend_ref_t* bref; - + int i = 0; CHK_DCB(dcb); CHK_CLIENT_RSES(rses); bref = rses->rses_backend_ref; - while (bref != NULL) + while (irses_nbackends) { if (bref->bref_dcb == dcb) { break; } bref++; + i += 1; + } + + if (i == rses->rses_nbackends) + { + bref = NULL; } return bref; } @@ -2749,30 +2889,37 @@ static int router_handle_state_switch( DCB_REASON reason, void* data) { - backend_ref_t* bref; - int rc = 1; + backend_ref_t* bref; + int rc = 1; + ROUTER_CLIENT_SES* rses; + SESSION* ses; + SERVER* srv; CHK_DCB(dcb); bref = (backend_ref_t *)data; CHK_BACKEND_REF(bref); - if (bref->bref_dcb != dcb) + srv = bref->bref_backend->backend_server; + + if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv)) { goto return_rc; } + ses = dcb->session; + CHK_SESSION(ses); + + rses = (ROUTER_CLIENT_SES *)dcb->session->router_session; + CHK_CLIENT_RSES(rses); switch (reason) { case DCB_REASON_NOT_RESPONDING: - if (BREF_IS_WAITING_RESULT(bref)) - { - dcb->func.hangup(dcb); - } + dcb->func.hangup(dcb); break; default: break; } + return_rc: return rc; -} - +} \ No newline at end of file