diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index c8c119da5..5aef7aea6 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -40,6 +40,7 @@ * 29/06/2015 Massimiliano Pinto Addition of master.ini for easy startup configuration * If not found router goes into BLRM_UNCONFIGURED state. * Cache dir is 'cache' under router->binlogdir. + * 07/08/2015 Massimiliano Pinto Addition of binlog check at startup if trx_safe is on * * @endverbatim */ @@ -99,9 +100,11 @@ static int blr_handle_config_item(const char *name, const char *value, ROUTER_IN static int blr_set_service_mysql_user(SERVICE *service); int blr_load_dbusers(ROUTER_INSTANCE *router); int blr_save_dbusers(ROUTER_INSTANCE *router); +static int blr_check_binlog(ROUTER_INSTANCE *router); extern void blr_cache_read_master_data(ROUTER_INSTANCE *router); extern char *decryptPassword(char *crypt); extern char *create_hex_sha1_sha1_passwd(char *passwd); +extern int blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug); /** The module object definition */ static ROUTER_OBJECT MyObject = { @@ -533,34 +536,28 @@ int rc = 0; " Fix errors in it or configure with CHANGE MASTER TO ...", inst->service->name, inst->binlogdir))); } + + /* Set service user or load db users */ + blr_set_service_mysql_user(inst->service); } else { - /** - * master.ini configuration is ok - * Master registration can start - */ inst->master_state = BLRM_UNCONNECTED; master_info = 1; - } - /* Set service user or load db users */ - if (inst->master_state == BLRM_UNCONFIGURED) { - blr_set_service_mysql_user(inst->service); - } else { + /* Try loading dbusers */ blr_load_dbusers(inst); } - /* - * Read any cached response messages - */ - - blr_cache_read_master_data(inst); - - /* - * Initialise the binlog file and position + /** + * Initialise the binlog router */ if (inst->master_state == BLRM_UNCONNECTED) { + + /* Read any cached response messages */ + blr_cache_read_master_data(inst); + + /* Find latest binlog file or create a new one (000001) */ if (blr_file_init(inst) == 0) { LOGIF(LE, (skygw_log_write_flush( @@ -583,7 +580,7 @@ int rc = 0; } } - /* + /** * We have completed the creation of the instance data, so now * insert this router instance into the linked list of routers * that have been created with this module. @@ -605,38 +602,25 @@ int rc = 0; free(name); } - /* - * Now start the replication from the master to MaxScale + /** + * Check whether replication can be started */ if (inst->master_state == BLRM_UNCONNECTED) { - - /** blr_read_events_all() may truncate binlog file or put - * master_state into BLR_SLAVE_STOPPED state. - * If an open transaction is detected @ pos xxx - * inst->binlog_position will be set to xxx - * - * Files are now truncated and router is not in BLR_SLAVE_STOPPED state - */ - - int n; - n = blr_read_events_all_events(inst, 1, 0); + /* Check current binlog */ + if (inst->trx_safe && !blr_check_binlog(inst)) { + /* Don't start replication, just return */ + return (ROUTER *)inst; + } LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "blr_read_events_all_events() ret = %i\n", n))); - - fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n); - fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", inst->current_pos, inst->binlog_position); - - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "Binlog router: current binlog file is: %s, current position %u\n", - inst->binlog_name, inst->binlog_position))); + LOGFILE_TRACE, + "Binlog router: current binlog file is: %s, current position %u\n", + inst->binlog_name, inst->binlog_position))); /* Start replication from master server */ blr_start_master(inst); } - + return (ROUTER *)inst; } @@ -1816,3 +1800,59 @@ uint32_t rval = 0, shift = 0; } return rval; } + +/** + * Check whether current binlog is valid. + * In case of errors BLR_SLAVE_STOPPED state is set + * If a partial transaction is found + * inst->binlog_position is set the pos where it started + * + * @param router The router instance + * @return 1 on success, 0 on failure + */ +/** 1 is succes, 0 is faulure */ +static int blr_check_binlog(ROUTER_INSTANCE *router) { + int n; + + /** blr_read_events_all() may set master_state + * to BLR_SLAVE_STOPPED state in case of found errors. + * In such conditions binlog file is NOT truncated and + * router state is set to BLR_SLAVE_STOPPED + * Last commited pos is set for both router->binlog_position + * and router->current_pos. + * + * If an open transaction is detected at pos XYZ + * inst->binlog_position will be set to XYZ while + * router->current_pos is the last event found. + */ + + n = blr_read_events_all_events(router, 0, 0); + + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "blr_read_events_all_events() ret = %i\n", n))); + + fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n); + fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", router->current_pos, router->binlog_position); + + if (n != 0) { + char msg_err[1024 + 1] = ""; + router->master_state = BLRM_SLAVE_STOPPED; + + snprintf(msg_err, 1024, "Error found in binlog %s. Safe pos is %lu", router->binlog_name, router->binlog_position); + /* set mysql_errno */ + router->m_errno = 1111; + + /* set io error message */ + router->m_errmsg = strdup(msg_err); + + /* set last_safe_pos */ + router->last_safe_pos = router->binlog_position; + + /** LOG THE ERROR **/ + + return 0; + } else { + return 1; + } +} diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 426b8958d..3386da804 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -1008,14 +1008,16 @@ int found_chksum = 0; "warning : an error has been found. " "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 1; } else { router->binlog_position = pos; router->current_pos = pos; + return 0; } } @@ -1054,8 +1056,10 @@ int found_chksum = 0; "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); + if (fix) { ftruncate(router->binlog_fd, router->binlog_position); fsync(router->binlog_fd); + } return 1; } @@ -1074,9 +1078,10 @@ int found_chksum = 0; "warning : an error has been found. " "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 1; } @@ -1097,8 +1102,10 @@ int found_chksum = 0; "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 1; } @@ -1143,9 +1150,10 @@ int found_chksum = 0; "warning : an error has been found. " "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 1; } @@ -1332,9 +1340,10 @@ int found_chksum = 0; "warning : an error has been found. " "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 2; } @@ -1356,8 +1365,10 @@ int found_chksum = 0; "Setting safe pos to %lu, current pos %lu", router->binlog_position, router->current_pos))); - ftruncate(router->binlog_fd, router->binlog_position); - fsync(router->binlog_fd); + if (fix) { + ftruncate(router->binlog_fd, router->binlog_position); + fsync(router->binlog_fd); + } return 2; } diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 75d198bdb..379fe3691 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -966,6 +966,8 @@ int n_bufs = -1, pn_bufs = -1; if (hdr.ok == 0) { + spinlock_acquire(&router->lock); + /* set mysql errno to 0 */ router->m_errno = 0; @@ -974,6 +976,8 @@ int n_bufs = -1, pn_bufs = -1; free(router->m_errmsg); router->m_errmsg = NULL; + spinlock_release(&router->lock); + int event_limit; /* diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 6e1fb4ff9..6125c5d28 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -2544,6 +2544,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->master_state = BLRM_SLAVE_STOPPED; + /* set last_safe_pos */ router->last_safe_pos = router->binlog_position; /** @@ -2637,7 +2638,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) snprintf(msg, 1024, "A transaction is still opened at pos %lu in file %s. Truncating it ... Try START SLAVE again.", router->last_safe_pos, router->prevbinlog); - /* Truncate previous binlog file to safe pos */ + /* Truncate previous binlog file to last_safe pos */ snprintf(file, PATH_MAX, "%s/%s", router->binlogdir, router->prevbinlog); truncate(file, router->last_safe_pos); @@ -2658,7 +2659,6 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave) router->pending_transaction = 0; router->last_safe_pos = 0; - router->last_safe_pos = 0; router->master_state = BLRM_SLAVE_STOPPED; router->current_pos = 4; router->binlog_position = 4;