From 889bdd4f8c5270d60967e02f91d81bb84dd58289 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sun, 8 Jun 2014 19:36:12 +0300 Subject: [PATCH] In dcb.c:dcb_close DCB is removed either before or after the call dcb->func.close. Since mysql backend protocol sends COM_QUIT and thus, writes to backend DCB, it is kept in DCB_STATE_POLLING until the write is completed. dcb.h: define ERRHAND temporarily since changes are still behind that macro Defined two error handling actions in router.h: ERRACT_NEW_COMMECTION and ERRACT_REPLY_CLIENT. Failed database is logged at expanding frequence to error and to message log due changes in mysql_mon.c. Added two new members in MONITOR_SERVERS: mon_err_count, and mon_prev_status so that each backend can be treated individually. Error handling: if mysql_backend.c:dcb_read fails, router's handleError is called instead of closing session. If mysql_client.c:SESSION_ROUTE_QUERY fails router's handleError is called instead of sending error to client. readwritesplit.c:select_connect_backend_servers is modified so that in can be called during active router session. When called, it attempts to find one master and maximum number of configured slaves in correct state if necessary. When handleError needs to replace failed unit it now calls select_connect_backend_servers. --- server/core/dcb.c | 40 +++- server/include/dcb.h | 2 +- server/include/router.h | 6 + server/modules/monitor/mysql_mon.c | 87 ++++++-- server/modules/monitor/mysqlmon.h | 2 + server/modules/protocol/mysql_backend.c | 38 +++- server/modules/protocol/mysql_client.c | 38 ++-- .../routing/readwritesplit/readwritesplit.c | 206 ++++++++++++------ 8 files changed, 298 insertions(+), 121 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 351f7e83a..bab2f87fe 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1027,6 +1027,9 @@ void dcb_close(DCB *dcb) { int rc; +#if defined(ERRHANDLE) + bool isclient; +#endif CHK_DCB(dcb); /*< @@ -1044,15 +1047,21 @@ dcb_close(DCB *dcb) dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); - /*< - * Stop dcb's listening and modify state accordingly. - */ - rc = poll_remove_dcb(dcb); - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); #if defined(ERRHANDLE) + isclient = dcb_isclient(dcb); + + if (isclient) + { + /*< + * Stop dcb's listening and modify state accordingly. + */ + rc = poll_remove_dcb(dcb); + + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); + } /** * close protocol and router session */ @@ -1060,6 +1069,25 @@ dcb_close(DCB *dcb) { dcb->func.close(dcb); } + + if (!isclient) + { + /*< + * Stop dcb's listening and modify state accordingly. + */ + rc = poll_remove_dcb(dcb); + + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); + } +#else + /*< + * Stop dcb's listening and modify state accordingly. + */ + rc = poll_remove_dcb(dcb); + + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); #endif dcb_call_callback(dcb, DCB_REASON_CLOSE); diff --git a/server/include/dcb.h b/server/include/dcb.h index 08c383014..0672d5185 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -24,7 +24,7 @@ #include #include -// #define ERRHANDLE +#define ERRHANDLE struct session; struct server; diff --git a/server/include/router.h b/server/include/router.h index ded9428a0..14302d230 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -97,4 +97,10 @@ typedef enum router_capability_t { RCAP_TYPE_PACKET_INPUT = (1 << 1) } router_capability_t; +typedef enum error_action { + ERRACT_NEW_CONNECTION = 0x001, + ERRACT_REPLY_CLIENT = 0x002 +} error_action_t; + + #endif diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 84b489fb3..f39412684 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -73,6 +73,8 @@ static void diagnostics(DCB *, void *); static void setInterval(void *, unsigned long); static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); +static bool mon_status_changed(MONITOR_SERVERS* mon_srv); +static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; @@ -180,7 +182,10 @@ MONITOR_SERVERS *ptr, *db; db->server = server; db->con = NULL; db->next = NULL; + db->mon_err_count = 0; + db->mon_prev_status = 0; spinlock_acquire(&handle->lock); + if (handle->databases == NULL) handle->databases = db; else @@ -310,15 +315,17 @@ monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) MYSQL_ROW row; MYSQL_RES *result; int num_fields; -int ismaster = 0, isslave = 0; -char *uname = handle->defaultUser, *passwd = handle->defaultPasswd; +int ismaster = 0; +int isslave = 0; +char *uname = handle->defaultUser; +char *passwd = handle->defaultPasswd; unsigned long int server_version = 0; char *server_string; unsigned long id = handle->id; int replication_heartbeat = handle->replicationHeartbeat; static int conn_err_count; - if (database->server->monuser != NULL) + if (database->server->monuser != NULL) { uname = database->server->monuser; passwd = database->server->monpw; @@ -331,6 +338,9 @@ static int conn_err_count; if (SERVER_IN_MAINT(database->server)) return; + /** Store prevous status */ + database->mon_prev_status = database->server->status; + if (database->con == NULL || mysql_ping(database->con) != 0) { char *dpwd = decryptPassword(passwd); @@ -338,6 +348,7 @@ static int conn_err_count; int read_timeout = 1; database->con = mysql_init(NULL); + rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); if (mysql_real_connect(database->con, @@ -349,7 +360,9 @@ static int conn_err_count; NULL, 0) == NULL) { - if (conn_err_count%10 == 0) + free(dpwd); + + if (mon_print_fail_status(database)) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -359,17 +372,15 @@ static int conn_err_count; database->server->port, mysql_error(database->con)))); } - conn_err_count += 1; - free(dpwd); + /** Store current status */ server_clear_status(database->server, SERVER_RUNNING); return; } free(dpwd); - } - - /* If we get this far then we have a working connection */ - server_set_status(database->server, SERVER_RUNNING); + } + /** Store current status */ + server_set_status(database->server, SERVER_RUNNING); /* get server version from current server */ server_version = mysql_get_server_version(database->con); @@ -629,7 +640,7 @@ static int conn_err_count; } } } - + /** Store current status */ if (ismaster) { server_set_status(database->server, SERVER_MASTER); @@ -657,7 +668,6 @@ monitorMain(void *arg) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MONITOR_SERVERS *ptr; -static int err_count; if (mysql_thread_init()) { @@ -680,13 +690,10 @@ static int err_count; ptr = handle->databases; while (ptr) { - unsigned int prev_status = ptr->server->status; - monitorDatabase(handle, ptr); - - if (ptr->server->status != prev_status || - (SERVER_IS_DOWN(ptr->server) && - err_count%10 == 0)) + + if (mon_status_changed(ptr) || + mon_print_fail_status(ptr)) { LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, @@ -697,7 +704,13 @@ static int err_count; } if (SERVER_IS_DOWN(ptr->server)) { - err_count += 1; + /** Increase this server'e error count */ + ptr->mon_err_count += 1; + } + else + { + /** Reset this server's error count */ + ptr->mon_err_count = 0; } ptr = ptr->next; } @@ -743,3 +756,39 @@ replicationHeartbeat(void *arg, int replicationHeartbeat) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int)); } + +static bool mon_status_changed( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + + if (mon_srv->mon_prev_status != mon_srv->server->status) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} + +static bool mon_print_fail_status( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + int errcount = mon_srv->mon_err_count; + uint8_t modval; + + modval = 1<<(MIN(errcount/10, 7)); + + if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} \ No newline at end of file diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index 8f5bcd704..5b5c7d04a 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -42,6 +42,8 @@ typedef struct monitor_servers { SERVER *server; /**< The server being monitored */ MYSQL *con; /**< The MySQL connection */ + int mon_err_count; + unsigned int mon_prev_status; struct monitor_servers *next; /**< The next server in the list */ } MONITOR_SERVERS; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index ccf9ebd46..b77671143 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -65,7 +65,9 @@ static int gw_backend_hangup(DCB *dcb); static int backend_write_delayqueue(DCB *dcb); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); -static int gw_session(DCB *backend_dcb, void *data); +#if defined(NOT_USED) + static int gw_session(DCB *backend_dcb, void *data); +#endif static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb); static GWPROTOCOL MyObject = { @@ -401,8 +403,12 @@ static int gw_read_backend_event(DCB *dcb) { SESSION *session = dcb->session; CHK_SESSION(session); - /* read available backend data */ - rc = dcb_read(dcb, &writebuf); + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; + + /* read available backend data */ + rc = dcb_read(dcb, &writebuf); if (rc < 0) { /*< vraa : errorHandle */ @@ -412,7 +418,24 @@ static int gw_read_backend_event(DCB *dcb) { * dcb from getting hanged. */ #if defined(ERRHANDLE) - dcb_close(dcb); + bool succp; + /** + * - send error for client + * - mark failed backend BREF_NOT_USED + * - go through all servers and select one according to + * the criteria that user specified in the beginning. + */ + router->handleError(router_instance, + rsession, + "Read from backend failed.", + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + if (!succp) + { + dcb_close(dcb); + } #else (dcb->func).close(dcb); #endif @@ -424,9 +447,6 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; /* Note the gwbuf doesn't have here a valid queue->command * descriptions as it is a fresh new one! @@ -671,6 +691,9 @@ static int gw_error_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; +#if defined(ERRHANDLE2) + router->handleError(); +#else if (dcb->state != DCB_STATE_POLLING) { /*< vraa : errorHandle */ /*< @@ -720,6 +743,7 @@ static int gw_error_backend_event(DCB *dcb) { router->closeSession(router_instance, rsession); } +#endif return rc; } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 60bca0afd..d99b23e18 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -804,7 +804,6 @@ int gw_read_client_event(DCB* dcb) { dcb_close(dcb); #else SESSION_ROUTE_QUERY(session, read_buffer); -// router->routeQuery(router_instance, rsession, read_buffer); LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Routed COM_QUIT to " @@ -824,6 +823,7 @@ int gw_read_client_event(DCB* dcb) { * to router. */ rc = route_by_statement(session, read_buffer); + if (read_buffer != NULL) { /** add incomplete mysql packet to read queue */ @@ -840,7 +840,7 @@ int gw_read_client_event(DCB* dcb) { if (rc == 1) { rc = 0; /**< here '0' means success */ } else { -#if defined(ERRHANDLE2) +#if defined(ERRHANDLE) bool succp; LOGIF(LE, (skygw_log_write_flush( @@ -848,40 +848,28 @@ int gw_read_client_event(DCB* dcb) { "Error : Routing the query failed. " "Reselecting backends."))); - /** - * Decide whether close router and its - * connections or just send an error to client - */ router->handleError(router_instance, - rsession, - "Query routing failed. " - "Query execution aborted. " - "Reselecting backend.", - NULL, - ERRACT_RELECT_BACKENDS, + rsession, + "Write to backend failed.", + dcb, + ERRACT_NEW_CONNECTION, &succp); if (!succp) { - router->handleError(router_instance, - rsession, - "Connection to " - "backend lost.", - NULL, - ERRACT_CLOSE_RSES, - NULL); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Reselecting backend " "servers failed."))); + + dcb_close(dcb); } - else - { - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "Reselected backend servers."))); - } + + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Reselected backend servers."))); + #else mysql_send_custom_error(dcb, 1, diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 0e85e9077..3e521e5cf 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -91,6 +91,8 @@ static void handleError( bool* succp); static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); +static int router_get_servercount(ROUTER_INSTANCE* router); +static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -425,14 +427,12 @@ static void* newSession( SESSION* session) { backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ - backend_ref_t* master_ref = NULL; /*< pointer to selected master */ - BACKEND** b; + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ int max_nslaves; /*< max # of slaves used in this session */ - int conf_max_nslaves; /*< value from configuration file */ int i; const int min_nservers = 1; /*< hard-coded for now */ static uint64_t router_client_ses_seq; /*< ID for client session */ @@ -478,9 +478,7 @@ static void* newSession( client_rses->rses_autocommit_enabled = true; client_rses->rses_transaction_active = false; - /** count servers */ - b = router->servers; - while (*(b++) != NULL) router_nservers++; + router_nservers = router_get_servercount(router); /** With too few servers session is not created */ if (router_nservers < min_nservers || @@ -558,6 +556,7 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif + backend_ref[i].bref_state = BREF_NOT_USED; backend_ref[i].bref_backend = router->servers[i]; /** store pointers to sescmd list to both cursors */ backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; @@ -565,22 +564,8 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; - } - /** - * Find out the number of read backend servers. - * Depending on the configuration value type, either copy direct count - * of slave connections or calculate the count from percentage value. - */ - if (client_rses->rses_config.rw_max_slave_conn_count > 0) - { - conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; - } - else - { - conf_max_nslaves = - (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; - } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + } + max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); spinlock_init(&client_rses->rses_lock); client_rses->rses_backend_ref = backend_ref; @@ -770,7 +755,10 @@ static void freeSession( return; } - +/** + * Provide a pointer to a suitable backend dcb. + * Detect failures in server statuses and reselect backends if necessary. + */ static bool get_dcb( DCB** p_dcb, ROUTER_CLIENT_SES* rses, @@ -1401,10 +1389,10 @@ static bool select_connect_backend_servers( select_criteria_t select_criteria, SESSION* session, ROUTER_INSTANCE* router) -{ +{ bool succp = true; - bool master_found = false; - bool master_connected = false; + bool master_found; + bool master_connected; int slaves_found = 0; int slaves_connected = 0; int i; @@ -1418,6 +1406,20 @@ static bool select_connect_backend_servers( succp = false; goto return_succp; } + + /** Master is already chosen and connected. This is slave failure case */ + if (*p_master_ref != NULL && + (*p_master_ref)->bref_state == BREF_IN_USE) + { + master_found = true; + master_connected = true; + } + /** New session or master failure case */ + else + { + master_found = false; + master_connected = false; + } /** Check slave selection criteria and set compare function */ p = criteria_cmpfun[select_criteria]; @@ -1502,7 +1504,7 @@ static bool select_connect_backend_servers( i++) { BACKEND* b = backend_ref[i].bref_backend; - + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Examine server " @@ -1514,6 +1516,7 @@ static bool select_connect_backend_servers( b->backend_conn_count, router->bitmask))); + if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == router->bitvalue)) @@ -1522,33 +1525,43 @@ static bool select_connect_backend_servers( SERVER_IS_SLAVE(b->backend_server)) { slaves_found += 1; - backend_ref[i].bref_dcb = dcb_connect( - b->backend_server, - session, - b->backend_server->protocol); - if (backend_ref[i].bref_dcb != NULL) + /** Slave is already connected */ + if (backend_ref[i].bref_state == BREF_IN_USE) { slaves_connected += 1; - backend_ref[i].bref_state = BREF_IN_USE; - /** - * Increase backend connection counter. - * Server's stats are _increased_ in - * dcb.c:dcb_alloc ! - * But decreased in the calling function - * of dcb_close. - */ - atomic_add(&b->backend_conn_count, 1); } + /** New slave connection is taking place */ else { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to establish " - "connection with slave %s:%d", - b->backend_server->name, - b->backend_server->port))); - /* handle connect error */ + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) + { + slaves_connected += 1; + backend_ref[i].bref_state = BREF_IN_USE; + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. + */ + atomic_add(&b->backend_conn_count, 1); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with slave %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ + } } } else if (!master_connected && @@ -1584,18 +1597,18 @@ static bool select_connect_backend_servers( /* handle connect error */ } } - else - { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to establish " - "connection with server %s:%d, %s", - b->backend_server->name, - b->backend_server->port, - STRSRVSTATUS(b->backend_server)))); - /* handle connect error */ - } + } + else + { + succp = false; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with server %s:%d, %s", + b->backend_server->name, + b->backend_server->port, + STRSRVSTATUS(b->backend_server)))); + /* handle connect error */ } } /*< for */ @@ -2496,12 +2509,40 @@ static void handleError ( int action, bool *succp) { - DCB* client_dcb = NULL; - SESSION* session = backend_dcb->session; + DCB* client_dcb = NULL; + SESSION* session = backend_dcb->session; + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)instance; + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; client_dcb = session->client; + CHK_DCB(client_dcb); - ss_dassert(client_dcb != NULL); + switch (action) { + case ERRACT_NEW_CONNECTION: + { + int router_nservers; + int max_nslaves; + + router_nservers = router_get_servercount(router); + max_nslaves = rses_get_max_slavecount(rses, router_nservers); + + *succp = select_connect_backend_servers( + &rses->rses_master_ref, + rses->rses_backend_ref, + router_nservers, + max_nslaves, + rses->rses_config.rw_slave_select_criteria, + session, + router); + + ss_dassert(*succp); + } + break; + + default: + *succp = false; + break; + } } static void print_error_packet( @@ -2555,3 +2596,42 @@ static void print_error_packet( while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL); } } + +static int router_get_servercount( + ROUTER_INSTANCE* router) +{ + int router_nservers = 0; + BACKEND** b = router->servers; + /** count servers */ + while (*(b++) != NULL) router_nservers++; + + return router_nservers; +} + +/** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. + */ +static int rses_get_max_slavecount( + ROUTER_CLIENT_SES* rses, + int router_nservers) +{ + int conf_max_nslaves; + int max_nslaves; + + CHK_CLIENT_RSES(rses); + + if (rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + return max_nslaves; +} \ No newline at end of file