Added blr_check_binlog at router startup if trx_safe is set

Added blr_check_binlog at router startup if trx_safe is set
This commit is contained in:
MassimilianoPinto
2015-08-07 11:59:15 +02:00
parent e5a2e5cd1f
commit 479e4e3203
4 changed files with 115 additions and 60 deletions

View File

@ -40,6 +40,7 @@
* 29/06/2015 Massimiliano Pinto Addition of master.ini for easy startup configuration
* If not found router goes into BLRM_UNCONFIGURED state.
* Cache dir is 'cache' under router->binlogdir.
* 07/08/2015 Massimiliano Pinto Addition of binlog check at startup if trx_safe is on
*
* @endverbatim
*/
@ -99,9 +100,11 @@ static int blr_handle_config_item(const char *name, const char *value, ROUTER_IN
static int blr_set_service_mysql_user(SERVICE *service);
int blr_load_dbusers(ROUTER_INSTANCE *router);
int blr_save_dbusers(ROUTER_INSTANCE *router);
static int blr_check_binlog(ROUTER_INSTANCE *router);
extern void blr_cache_read_master_data(ROUTER_INSTANCE *router);
extern char *decryptPassword(char *crypt);
extern char *create_hex_sha1_sha1_passwd(char *passwd);
extern int blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug);
/** The module object definition */
static ROUTER_OBJECT MyObject = {
@ -533,34 +536,28 @@ int rc = 0;
" Fix errors in it or configure with CHANGE MASTER TO ...",
inst->service->name, inst->binlogdir)));
}
/* Set service user or load db users */
blr_set_service_mysql_user(inst->service);
} else {
/**
* master.ini configuration is ok
* Master registration can start
*/
inst->master_state = BLRM_UNCONNECTED;
master_info = 1;
}
/* Set service user or load db users */
if (inst->master_state == BLRM_UNCONFIGURED) {
blr_set_service_mysql_user(inst->service);
} else {
/* Try loading dbusers */
blr_load_dbusers(inst);
}
/*
* Read any cached response messages
*/
blr_cache_read_master_data(inst);
/*
* Initialise the binlog file and position
/**
* Initialise the binlog router
*/
if (inst->master_state == BLRM_UNCONNECTED) {
/* Read any cached response messages */
blr_cache_read_master_data(inst);
/* Find latest binlog file or create a new one (000001) */
if (blr_file_init(inst) == 0)
{
LOGIF(LE, (skygw_log_write_flush(
@ -583,7 +580,7 @@ int rc = 0;
}
}
/*
/**
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
* that have been created with this module.
@ -605,38 +602,25 @@ int rc = 0;
free(name);
}
/*
* Now start the replication from the master to MaxScale
/**
* Check whether replication can be started
*/
if (inst->master_state == BLRM_UNCONNECTED) {
/** blr_read_events_all() may truncate binlog file or put
* master_state into BLR_SLAVE_STOPPED state.
* If an open transaction is detected @ pos xxx
* inst->binlog_position will be set to xxx
*
* Files are now truncated and router is not in BLR_SLAVE_STOPPED state
*/
int n;
n = blr_read_events_all_events(inst, 1, 0);
/* Check current binlog */
if (inst->trx_safe && !blr_check_binlog(inst)) {
/* Don't start replication, just return */
return (ROUTER *)inst;
}
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"blr_read_events_all_events() ret = %i\n", n)));
fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n);
fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", inst->current_pos, inst->binlog_position);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Binlog router: current binlog file is: %s, current position %u\n",
inst->binlog_name, inst->binlog_position)));
LOGFILE_TRACE,
"Binlog router: current binlog file is: %s, current position %u\n",
inst->binlog_name, inst->binlog_position)));
/* Start replication from master server */
blr_start_master(inst);
}
return (ROUTER *)inst;
}
@ -1816,3 +1800,59 @@ uint32_t rval = 0, shift = 0;
}
return rval;
}
/**
* Check whether current binlog is valid.
* In case of errors BLR_SLAVE_STOPPED state is set
* If a partial transaction is found
* inst->binlog_position is set the pos where it started
*
* @param router The router instance
* @return 1 on success, 0 on failure
*/
/** 1 is succes, 0 is faulure */
static int blr_check_binlog(ROUTER_INSTANCE *router) {
int n;
/** blr_read_events_all() may set master_state
* to BLR_SLAVE_STOPPED state in case of found errors.
* In such conditions binlog file is NOT truncated and
* router state is set to BLR_SLAVE_STOPPED
* Last commited pos is set for both router->binlog_position
* and router->current_pos.
*
* If an open transaction is detected at pos XYZ
* inst->binlog_position will be set to XYZ while
* router->current_pos is the last event found.
*/
n = blr_read_events_all_events(router, 0, 0);
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"blr_read_events_all_events() ret = %i\n", n)));
fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n);
fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", router->current_pos, router->binlog_position);
if (n != 0) {
char msg_err[1024 + 1] = "";
router->master_state = BLRM_SLAVE_STOPPED;
snprintf(msg_err, 1024, "Error found in binlog %s. Safe pos is %lu", router->binlog_name, router->binlog_position);
/* set mysql_errno */
router->m_errno = 1111;
/* set io error message */
router->m_errmsg = strdup(msg_err);
/* set last_safe_pos */
router->last_safe_pos = router->binlog_position;
/** LOG THE ERROR **/
return 0;
} else {
return 1;
}
}

View File

@ -1008,14 +1008,16 @@ int found_chksum = 0;
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 1;
} else {
router->binlog_position = pos;
router->current_pos = pos;
return 0;
}
}
@ -1054,8 +1056,10 @@ int found_chksum = 0;
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 1;
}
@ -1074,9 +1078,10 @@ int found_chksum = 0;
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 1;
}
@ -1097,8 +1102,10 @@ int found_chksum = 0;
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 1;
}
@ -1143,9 +1150,10 @@ int found_chksum = 0;
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 1;
}
@ -1332,9 +1340,10 @@ int found_chksum = 0;
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 2;
}
@ -1356,8 +1365,10 @@ int found_chksum = 0;
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
if (fix) {
ftruncate(router->binlog_fd, router->binlog_position);
fsync(router->binlog_fd);
}
return 2;
}

View File

@ -966,6 +966,8 @@ int n_bufs = -1, pn_bufs = -1;
if (hdr.ok == 0)
{
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
@ -974,6 +976,8 @@ int n_bufs = -1, pn_bufs = -1;
free(router->m_errmsg);
router->m_errmsg = NULL;
spinlock_release(&router->lock);
int event_limit;
/*

View File

@ -2544,6 +2544,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
router->master_state = BLRM_SLAVE_STOPPED;
/* set last_safe_pos */
router->last_safe_pos = router->binlog_position;
/**
@ -2637,7 +2638,7 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
snprintf(msg, 1024, "A transaction is still opened at pos %lu in file %s. Truncating it ... Try START SLAVE again.", router->last_safe_pos, router->prevbinlog);
/* Truncate previous binlog file to safe pos */
/* Truncate previous binlog file to last_safe pos */
snprintf(file, PATH_MAX, "%s/%s", router->binlogdir, router->prevbinlog);
truncate(file, router->last_safe_pos);
@ -2658,7 +2659,6 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
router->pending_transaction = 0;
router->last_safe_pos = 0;
router->last_safe_pos = 0;
router->master_state = BLRM_SLAVE_STOPPED;
router->current_pos = 4;
router->binlog_position = 4;