Update messages on errors

Update messages on errors
This commit is contained in:
MassimilianoPinto 2015-08-09 23:14:55 +02:00
parent 12ddd8e830
commit 9bada09d2d
4 changed files with 69 additions and 61 deletions

View File

@ -271,7 +271,7 @@ int rc = 0;
inst->m_errno = 0; inst->m_errno = 0;
inst->m_errmsg = NULL; inst->m_errmsg = NULL;
inst->trx_safe = 0; inst->trx_safe = 1;
inst->pending_transaction = 0; inst->pending_transaction = 0;
inst->last_safe_pos = 0; inst->last_safe_pos = 0;
@ -425,8 +425,6 @@ int rc = 0;
service->name))); service->name)));
} }
fprintf(stderr, "Transaction safety is [%i]\n", inst->trx_safe);
if (inst->fileroot == NULL) if (inst->fileroot == NULL)
inst->fileroot = strdup(BINLOG_NAME_ROOT); inst->fileroot = strdup(BINLOG_NAME_ROOT);
inst->active_logs = 0; inst->active_logs = 0;
@ -602,6 +600,14 @@ int rc = 0;
free(name); free(name);
} }
/* Log whether the transaction safety option value is on*/
if (inst->trx_safe) {
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"%s: Service has transaction safety option set to ON",
service->name)));
}
/** /**
* Check whether replication can be started * Check whether replication can be started
*/ */
@ -614,7 +620,7 @@ int rc = 0;
LOGIF(LT, (skygw_log_write_flush( LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"Binlog router: current binlog file is: %s, current position %u\n", "Current binlog file is %s, current pos is %lu\n",
inst->binlog_name, inst->binlog_position))); inst->binlog_name, inst->binlog_position)));
/* Start replication from master server */ /* Start replication from master server */
@ -954,7 +960,13 @@ struct tm tm;
dcb_printf(dcb, "\tCurrent binlog file: %s\n", dcb_printf(dcb, "\tCurrent binlog file: %s\n",
router_inst->binlog_name); router_inst->binlog_name);
dcb_printf(dcb, "\tCurrent binlog position: %u\n", dcb_printf(dcb, "\tCurrent binlog position: %u\n",
router_inst->current_pos);
if (router_inst->trx_safe) {
if (router_inst->pending_transaction) {
dcb_printf(dcb, "\tCurrent open transaction pos: %u\n",
router_inst->binlog_position); router_inst->binlog_position);
}
}
dcb_printf(dcb, "\tNumber of slave servers: %u\n", dcb_printf(dcb, "\tNumber of slave servers: %u\n",
router_inst->stats.n_slaves); router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n", dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n",
@ -1828,20 +1840,17 @@ static int blr_check_binlog(ROUTER_INSTANCE *router) {
n = blr_read_events_all_events(router, 0, 0); n = blr_read_events_all_events(router, 0, 0);
LOGIF(LT, (skygw_log_write_flush( LOGIF(LD, (skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_DEBUG,
"blr_read_events_all_events() ret = %i\n", n))); "blr_read_events_all_events() ret = %i\n", n)));
fprintf(stderr, "blr_read_events_all_events() ret = %i\n", n);
fprintf(stderr, "current_pos / binlog_pos are [%llu] / [%llu]\n", router->current_pos, router->binlog_position);
if (n != 0) { if (n != 0) {
char msg_err[1024 + 1] = ""; char msg_err[1024 + 1] = "";
router->master_state = BLRM_SLAVE_STOPPED; router->master_state = BLRM_SLAVE_STOPPED;
snprintf(msg_err, 1024, "Error found in binlog %s. Safe pos is %lu", router->binlog_name, router->binlog_position); snprintf(msg_err, 1024, "Error found in binlog %s. Safe pos is %lu", router->binlog_name, router->binlog_position);
/* set mysql_errno */ /* set mysql_errno */
router->m_errno = 1111; router->m_errno = 2032;
/* set io error message */ /* set io error message */
router->m_errmsg = strdup(msg_err); router->m_errmsg = strdup(msg_err);
@ -1849,7 +1858,11 @@ static int blr_check_binlog(ROUTER_INSTANCE *router) {
/* set last_safe_pos */ /* set last_safe_pos */
router->last_safe_pos = router->binlog_position; router->last_safe_pos = router->binlog_position;
/** LOG THE ERROR **/ LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error found in binlog file %s. Safe starting pos is %lu",
router->binlog_name,
router->binlog_position)));
return 0; return 0;
} else { } else {

View File

@ -397,9 +397,10 @@ struct stat statb;
} }
if (fstat(file->fd, &statb) == 0) if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size; filelen = statb.st_size;
if (pos >= filelen)
if (pos > filelen)
{ {
LOGIF(LD, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Attempting to read off the end of the binlog file %s, " "Attempting to read off the end of the binlog file %s, "
"event at %lu.", file->binlogname, pos))); "event at %lu.", file->binlogname, pos)));
return NULL; return NULL;
@ -408,9 +409,15 @@ struct stat statb;
if (strcmp(router->binlog_name, file->binlogname) == 0 && if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position) pos >= router->binlog_position)
{ {
return NULL; if (pos > router->binlog_position)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Attempting to read off the binlog pos in file %s, "
"event at %lu.", file->binlogname, pos)));
} }
return NULL;
}
/* Read the header information from the file */ /* Read the header information from the file */
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19) if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
@ -1032,19 +1039,14 @@ int found_chksum = 0;
hdr.next_pos = EXTRACT32(&hdbuf[13]); hdr.next_pos = EXTRACT32(&hdbuf[13]);
hdr.flags = EXTRACT16(&hdbuf[17]); hdr.flags = EXTRACT16(&hdbuf[17]);
//fprintf(stderr, ">>>> pos [%llu] event type %i\n", pos, hdr.event_type);
//fprintf(stderr, ">>>> pos [%llu] event size %lu\n", pos, hdr.event_size);
//fprintf(stderr, ">>>> pos [%llu] event next_pos %lu\n", pos, hdr.next_pos);
/* TO DO */ /* TO DO */
/* Add MariaDB 10 check for MAX_EVENT_TYPE */
/* Add MariaDB 10 check */
/* Check event type against MAX_EVENT_TYPE */ /* Check event type against MAX_EVENT_TYPE */
if (hdr.event_type > MAX_EVENT_TYPE) if (hdr.event_type > MAX_EVENT_TYPE)
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Found an Invalid event type 0x%x. " "Found an Invalid event type 0x%x. "
"Binlog file is %s, position %llu", "Binlog file is %s, position %llu",
hdr.event_type, hdr.event_type,
router->binlog_name, pos))); router->binlog_name, pos)));
@ -1068,7 +1070,7 @@ int found_chksum = 0;
if (hdr.event_size <= 0) if (hdr.event_size <= 0)
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: event size error: " "Event size error: "
"size %d at %llu.", "size %d at %llu.",
hdr.event_size, pos))); hdr.event_size, pos)));
@ -1121,7 +1123,7 @@ int found_chksum = 0;
if (n == -1) if (n == -1)
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Error reading the event at %llu in %s. " "Error reading the event at %llu in %s. "
"%s, expected %d bytes.", "%s, expected %d bytes.",
pos, router->binlog_name, pos, router->binlog_name,
strerror(errno), hdr.event_size - 19))); strerror(errno), hdr.event_size - 19)));
@ -1129,14 +1131,14 @@ int found_chksum = 0;
else else
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Short read when reading the event at %llu in %s. " "Short read when reading the event at %llu in %s. "
"Expected %d bytes got %d bytes.", "Expected %d bytes got %d bytes.",
pos, router->binlog_name, hdr.event_size - 19, n))); pos, router->binlog_name, hdr.event_size - 19, n)));
if (filelen > 0 && filelen - pos < hdr.event_size) if (filelen > 0 && filelen - pos < hdr.event_size)
{ {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Binlog event is close to the end of the binlog file %s, " "Binlog event is close to the end of the binlog file %s, "
" size is %lu.", " size is %lu.",
router->binlog_name, filelen))); router->binlog_name, filelen)));
} }
@ -1264,8 +1266,6 @@ int found_chksum = 0;
statement_sql = calloc(1, statement_len+1); statement_sql = calloc(1, statement_len+1);
strncpy(statement_sql, (char *)ptr+4+4+1+2+2+var_block_len+1+db_name_len, statement_len); strncpy(statement_sql, (char *)ptr+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
//fprintf(stderr, "QUERY_EVENT = [%s] %i / %i\n", statement_sql, (4+4+1+2+2+var_block_len+1+db_name_len), statement_len);
/* A transaction starts with this event */ /* A transaction starts with this event */
if (strncmp(statement_sql, "BEGIN", 5) == 0) { if (strncmp(statement_sql, "BEGIN", 5) == 0) {
if (pending_transaction > 0) { if (pending_transaction > 0) {
@ -1380,7 +1380,7 @@ int found_chksum = 0;
} else { } else {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Current event type %lu @ %llu has nex pos = %llu : exiting", hdr.event_type, pos, hdr.next_pos))); "Current event type %lu @ %llu has nex pos = %llu : exiting", hdr.event_type, pos, hdr.next_pos)));
break; break;
} }
} }

View File

@ -1016,39 +1016,26 @@ int n_bufs = -1, pn_bufs = -1;
router->lastEventReceived = hdr.event_type; router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp; router->lastEventTimestamp = hdr.timestamp;
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
/** /**
* Check for an open transaction, if the option is set * Check for an open transaction, if the option is set
* Only complete transactions should be sent to sleves * Only complete transactions should be sent to sleves
* *
* If a trasaction is pending router->last_commit_pos * If a trasaction is pending router->binlog_position
* won't be updated. * won't be updated to router->current_pos
*/ */
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) { if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
/* set last_commit_pos to binlog_position */ /* no pending transaction: set current_pos to binlog_position */
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos; router->binlog_position = router->current_pos;
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
} else {
/**
* A transaction is pending.
* Last_commit_pos is on hold.
*
* Log a message if the transaction was opened
* at binlog router start
*/
if (router->last_written == 0) {
fprintf(stderr, "*** Router started with an Open transaction at %lu / %lu\n", router->binlog_position, router->current_pos);
} }
}
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
/** /**
* Detect transactions in events * Detect transactions in events
@ -1072,24 +1059,31 @@ int n_bufs = -1, pn_bufs = -1;
/* Check for BEGIN (it comes for START TRANSACTION too) */ /* Check for BEGIN (it comes for START TRANSACTION too) */
if (strncmp(statement_sql, "BEGIN", 5) == 0) { if (strncmp(statement_sql, "BEGIN", 5) == 0) {
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction > 0) { if (router->pending_transaction > 0) {
fprintf(stderr, "*** A transaction is already open @ %lu and a new one starts @ %lu\n", router->binlog_position, router->current_pos); LOGIF(LE,(skygw_log_write_flush(LOGFILE_ERROR,
"Error: a transaction is already open "
"@ %lu and a new one starts @ %lu",
router->binlog_position,
router->current_pos)));
// An action should be taken here
} }
spinlock_acquire(&router->lock);
router->pending_transaction = 1; router->pending_transaction = 1;
spinlock_release(&router->binlog_lock); spinlock_release(&router->lock);
} }
/* Check for COMMIT in non transactional store engines */ /* Check for COMMIT in non transactional store engines */
if (strncmp(statement_sql, "COMMIT", 6) == 0) { if (strncmp(statement_sql, "COMMIT", 6) == 0) {
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->lock);
router->pending_transaction = 2; router->pending_transaction = 2;
spinlock_release(&router->binlog_lock); spinlock_release(&router->lock);
} }
free(statement_sql); free(statement_sql);
@ -1101,11 +1095,11 @@ int n_bufs = -1, pn_bufs = -1;
xid = extract_field(ptr+4+20, 64); xid = extract_field(ptr+4+20, 64);
if (router->pending_transaction) { if (router->pending_transaction) {
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->lock);
router->pending_transaction = 3; router->pending_transaction = 3;
spinlock_release(&router->binlog_lock); spinlock_release(&router->lock);
} }
} }
} }
@ -1231,12 +1225,12 @@ int n_bufs = -1, pn_bufs = -1;
* If transaction is closed: * If transaction is closed:
* *
* 1) read current binlog starting * 1) read current binlog starting
* from router->last_commit_pos * from router->binlog_position
* *
* 2) distribute read event * 2) distribute read event
* *
* 3) if current pos = router->binlog_position * 3) set router->binlog_position to
* update router->last_commit_pos * router->current_pos
* *
*/ */
@ -1538,7 +1532,7 @@ int action;
if (action == 1) if (action == 1)
{ {
if (slave->binlog_pos == router->last_written && if (slave->binlog_pos <= router->last_written &&
(strcmp(slave->binlogfile, router->binlog_name) == 0 || (strcmp(slave->binlogfile, router->binlog_name) == 0 ||
(hdr->event_type == ROTATE_EVENT && (hdr->event_type == ROTATE_EVENT &&
strcmp(slave->binlogfile, router->prevbinlog)))) strcmp(slave->binlogfile, router->prevbinlog))))

View File

@ -1769,6 +1769,7 @@ uint8_t *ptr;
} }
} }
slave->stats.n_bursts++; slave->stats.n_bursts++;
while (burst-- && burst_size > 0 && while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL) (record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL)
{ {