Develop merge

Develop merge
This commit is contained in:
MassimilianoPinto
2015-11-19 17:06:30 +01:00
29 changed files with 5387 additions and 5203 deletions

View File

@ -65,7 +65,7 @@
static int blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
static void blr_log_header(int priority, char *msg, uint8_t *ptr);
void blr_cache_read_master_data(ROUTER_INSTANCE *router);
int blr_file_get_next_binlogname(ROUTER_INSTANCE *router);
int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file);
@ -120,9 +120,8 @@ struct dirent *dp;
}
if (access(router->binlogdir, R_OK) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Unable to read the binlog directory %s.",
router->service->name, router->binlogdir)));
MXS_ERROR("%s: Unable to read the binlog directory %s.",
router->service->name, router->binlogdir);
return 0;
}
@ -131,10 +130,9 @@ struct dirent *dp;
if ((dirp = opendir(path)) == NULL)
{
char err_msg[BLRM_STRERROR_R_MSG_SIZE];
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Unable to read the binlog directory %s, %s.",
router->service->name, router->binlogdir,
strerror_r(errno, err_msg, sizeof(err_msg)))));
MXS_ERROR("%s: Unable to read the binlog directory %s, %s.",
router->service->name, router->binlogdir,
strerror_r(errno, err_msg, sizeof(err_msg)));
return 0;
}
while ((dp = readdir(dirp)) != NULL)
@ -233,9 +231,8 @@ int fd;
{
char err_msg[STRERROR_BUFLEN];
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Failed to create binlog file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)))));
MXS_ERROR("%s: Failed to create binlog file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
return 0;
}
fsync(fd);
@ -265,9 +262,8 @@ int fd;
if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s for append.",
path)));
MXS_ERROR("Failed to open binlog file %s for append.",
path);
return;
}
fsync(fd);
@ -281,9 +277,8 @@ int fd;
} else {
/* If for any reason the file's length is between 1 and 3 bytes
* then report an error. */
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: binlog file %s has an invalid length %lu.",
router->service->name, path, router->current_pos)));
MXS_ERROR("%s: binlog file %s has an invalid length %lu.",
router->service->name, path, router->current_pos);
close(fd);
spinlock_release(&router->binlog_lock);
return;
@ -310,12 +305,11 @@ int n;
hdr->next_pos - hdr->event_size)) != hdr->event_size)
{
char err_msg[STRERROR_BUFLEN];
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Failed to write binlog record at %d of %s, %s. "
"Truncating to previous record.",
router->service->name, hdr->next_pos - hdr->event_size,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)))));
MXS_ERROR("%s: Failed to write binlog record at %d of %s, %s. "
"Truncating to previous record.",
router->service->name, hdr->next_pos - hdr->event_size,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partual event that was written */
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
return 0;
@ -379,8 +373,7 @@ BLFILE *file;
if ((file->fd = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s", path)));
MXS_ERROR("Failed to open binlog file %s", path);
free(file);
spinlock_release(&router->fileslock);
return NULL;
@ -479,9 +472,8 @@ struct stat statb;
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file '%s' at %lu.",
file->binlogname, pos)));
MXS_DEBUG("Reached end of binlog file '%s' at %lu.",
file->binlogname, pos);
/* set ok indicator */
hdr->ok = SLAVE_POS_READ_OK;
@ -535,21 +527,19 @@ struct stat statb;
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position in header appears to be incorrect "
"rereading event header at pos %lu in file %s, "
"file size is %lu. Master will write %lu in %s next.",
pos, file->binlogname, filelen, router->binlog_position,
router->binlog_name)));
MXS_ERROR("Next position in header appears to be incorrect "
"rereading event header at pos %lu in file %s, "
"file size is %lu. Master will write %lu in %s next.",
pos, file->binlogname, filelen, router->binlog_position,
router->binlog_name);
if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
{
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %lu.",
pos)));
MXS_DEBUG("Reached end of binlog file at %lu.",
pos);
/* set ok indicator */
hdr->ok = SLAVE_POS_READ_OK;
@ -591,9 +581,8 @@ struct stat statb;
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position corrected by "
"rereading")));
MXS_ERROR("Next position corrected by "
"rereading");
}
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
@ -629,7 +618,7 @@ struct stat statb;
"current file size is %lu, event at %lu in binlog file '%s'",
filelen, pos, file->binlogname);
}
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
blr_log_header(LOG_ERR, "Possible malformed event header", hdbuf);
}
gwbuf_free(result);
@ -688,12 +677,12 @@ blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
/**
* Log the event header of binlog event
*
* @param file The log file into which to write the entry
* @param msg A message strign to preceed the header with
* @param ptr The event header raw data
* @param priority The syslog priority of the message (LOG_ERR, LOG_WARNING, etc.)
* @param msg A message strign to preceed the header with
* @param ptr The event header raw data
*/
static void
blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr)
blr_log_header(int priority, char *msg, uint8_t *ptr)
{
char buf[400], *bufp;
int i;
@ -702,8 +691,7 @@ int i;
bufp += sprintf(bufp, "%s: ", msg);
for (i = 0; i < BINLOG_EVENT_HDR_LEN; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
skygw_log_write_flush(file, "%s", buf);
MXS_LOG_MESSAGE(priority, "%s", buf);
}
/**
@ -873,9 +861,8 @@ int fde_seen = 0;
memset(&fde_event, '\0', sizeof(fde_event));
if (router->binlog_fd == -1) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Current binlog file %s is not open",
router->binlog_name)));
MXS_ERROR("Current binlog file %s is not open",
router->binlog_name);
return 1;
}
@ -893,10 +880,9 @@ int fde_seen = 0;
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"End of binlog file [%s] at %llu.",
router->binlog_name,
pos)));
MXS_DEBUG("End of binlog file [%s] at %llu.",
router->binlog_name,
pos);
if (n_transactions)
average_events = (double)((double)total_events / (double)n_transactions) * (1.0);
if (n_transactions)
@ -922,24 +908,25 @@ int fde_seen = 0;
blr_format_event_size(&average_bytes, average_label);
blr_format_event_size(&format_max_bytes, max_label);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Transaction Summary for binlog '%s'\n"
"\t\t\tDescription %17s%17s%17s\n\t\t\t"
"No. of Transactions %16lu\n\t\t\t"
"No. of Events %16lu %16.1f %16lu\n\t\t\t"
"No. of Bytes %16.1f%s%16.1f%s%16.1f%s", router->binlog_name,
"Total", "Average", "Max",
n_transactions, total_events,
average_events, max_events,
format_total_bytes, total_label, average_bytes, average_label, format_max_bytes, max_label)));
MXS_NOTICE("Transaction Summary for binlog '%s'\n"
"\t\t\tDescription %17s%17s%17s\n\t\t\t"
"No. of Transactions %16lu\n\t\t\t"
"No. of Events %16lu %16.1f %16lu\n\t\t\t"
"No. of Bytes %16.1f%s%16.1f%s%16.1f%s",
router->binlog_name,
"Total", "Average", "Max",
n_transactions, total_events,
average_events, max_events,
format_total_bytes, total_label,
average_bytes, average_label,
format_max_bytes, max_label);
}
if (pending_transaction) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Warning : Binlog file %s contains a previous Opened Transaction"
" @ %llu. This pos is safe for slaves",
router->binlog_name,
last_known_commit)));
MXS_WARNING("Binlog file %s contains a previous Opened "
"Transaction @ %llu. This pos is safe for slaves",
router->binlog_name,
last_known_commit);
}
@ -948,24 +935,21 @@ int fde_seen = 0;
{
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Failed to read binlog file %s at position %llu"
" (%s).", router->binlog_name,
pos, err_msg)));
MXS_ERROR("Failed to read binlog file %s at position %llu"
" (%s).", router->binlog_name,
pos, err_msg);
if (errno == EBADF)
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Bad file descriptor in read binlog for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd)));
MXS_ERROR("Bad file descriptor in read binlog for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd);
break;
}
default:
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name, pos)));
MXS_ERROR("Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name, pos);
break;
}
@ -993,16 +977,14 @@ int fde_seen = 0;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1030,21 +1012,19 @@ int fde_seen = 0;
if (router->mariadb10_compat) {
if (hdr.event_type > MAX_EVENT_TYPE_MARIADB10) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid MariaDB 10 event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type,
router->binlog_name, pos)));
MXS_ERROR("Invalid MariaDB 10 event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type,
router->binlog_name, pos);
event_error = 1;
}
} else {
if (hdr.event_type > MAX_EVENT_TYPE) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type,
router->binlog_name, pos)));
MXS_ERROR("Invalid event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type,
router->binlog_name, pos);
event_error = 1;
}
@ -1056,19 +1036,17 @@ int fde_seen = 0;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found in %s. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_name,
router->binlog_position,
router->current_pos)));
MXS_WARNING("an error has been found in %s. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_name,
router->binlog_position,
router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1078,25 +1056,22 @@ int fde_seen = 0;
if (hdr.event_size <= 0)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Event size error: "
"size %d at %llu.",
hdr.event_size, pos)));
MXS_ERROR("Event size error: "
"size %d at %llu.",
hdr.event_size, pos);
router->binlog_position = last_known_commit;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1107,26 +1082,23 @@ int fde_seen = 0;
/* Allocate a GWBUF for the event */
if ((result = gwbuf_alloc(hdr.event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Failed to allocate memory for binlog entry, "
"size %d at %llu.",
hdr.event_size, pos)));
MXS_ERROR("Failed to allocate memory for binlog entry, "
"size %d at %llu.",
hdr.event_size, pos);
router->binlog_position = last_known_commit;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1145,25 +1117,23 @@ int fde_seen = 0;
{
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error reading the event at %llu in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name,
err_msg, hdr.event_size - BINLOG_EVENT_HDR_LEN)));
MXS_ERROR("Error reading the event at %llu in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name,
err_msg, hdr.event_size - BINLOG_EVENT_HDR_LEN);
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Short read when reading the event at %llu in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name, hdr.event_size - BINLOG_EVENT_HDR_LEN, n)));
MXS_ERROR("Short read when reading the event at %llu in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name,
hdr.event_size - BINLOG_EVENT_HDR_LEN, n);
if (filelen > 0 && filelen - pos < hdr.event_size)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog event is close to the end of the binlog file %s, "
" size is %lu.",
router->binlog_name, filelen)));
MXS_ERROR("Binlog event is close to the end of the binlog file %s, "
" size is %lu.",
router->binlog_name, filelen);
}
}
@ -1173,16 +1143,14 @@ int fde_seen = 0;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1229,9 +1197,8 @@ int fde_seen = 0;
}
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"- Format Description event FDE @ %llu, size %lu, time %lu (%s)",
pos, (unsigned long)hdr.event_size, fde_event.event_time, buf_t)));
MXS_DEBUG("- Format Description event FDE @ %llu, size %lu, time %lu (%s)",
pos, (unsigned long)hdr.event_size, fde_event.event_time, buf_t);
event_header_length = ptr[2 + 50 + 4];
event_header_ntypes = hdr.event_size - event_header_length - (2 + 50 + 4 + 1);
@ -1252,14 +1219,12 @@ int fde_seen = 0;
n_events = hdr.event_size - event_header_length - (2 + 50 + 4 + 1);
if(debug) {
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE ServerVersion [%50s]", ptr + 2)));
MXS_DEBUG(" FDE ServerVersion [%50s]", ptr + 2);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE Header EventLength %i"
", N. of supported MySQL/MariaDB events %i",
event_header_length,
(n_events - event_header_ntypes))));
MXS_DEBUG(" FDE Header EventLength %i"
", N. of supported MySQL/MariaDB events %i",
event_header_length,
(n_events - event_header_ntypes));
}
if (event_header_ntypes < n_events) {
@ -1267,10 +1232,10 @@ int fde_seen = 0;
check_alg = checksum[0];
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE Checksum alg desc %i, alg type %s",
check_alg,
check_alg == 1 ? "BINLOG_CHECKSUM_ALG_CRC32" : "NONE or UNDEF")));
MXS_DEBUG(" FDE Checksum alg desc %i, alg type %s",
check_alg,
check_alg == 1 ?
"BINLOG_CHECKSUM_ALG_CRC32" : "NONE or UNDEF");
if (check_alg == 1) {
found_chksum = 1;
} else {
@ -1303,9 +1268,8 @@ int fde_seen = 0;
file[slen] = 0;
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"- Rotate event @ %llu, next file is [%s] @ %lu",
pos, file, new_pos)));
MXS_DEBUG("- Rotate event @ %llu, next file is [%s] @ %lu",
pos, file, new_pos);
}
/* If MariaDB 10 compatibility:
@ -1325,11 +1289,11 @@ int fde_seen = 0;
if (flags == 0) {
if (pending_transaction > 0) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Transaction cannot be @ pos %llu: "
"Another MariaDB 10 transaction (GTID %u-%u-%lu)"
" was opened at %llu",
pos, domainid, hdr.serverid, n_sequence, last_known_commit)));
MXS_ERROR("Transaction cannot be @ pos %llu: "
"Another MariaDB 10 transaction (GTID %u-%u-%lu)"
" was opened at %llu",
pos, domainid, hdr.serverid,
n_sequence, last_known_commit);
gwbuf_free(result);
@ -1341,10 +1305,9 @@ int fde_seen = 0;
event_bytes = 0;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"> MariaDB 10 Transaction (GTID %u-%u-%lu)"
" starts @ pos %llu",
domainid, hdr.serverid, n_sequence, pos)));
MXS_DEBUG("> MariaDB 10 Transaction (GTID %u-%u-%lu)"
" starts @ pos %llu",
domainid, hdr.serverid, n_sequence, pos);
}
}
}
@ -1370,10 +1333,9 @@ int fde_seen = 0;
/* A transaction starts with this event */
if (strncmp(statement_sql, "BEGIN", 5) == 0) {
if (pending_transaction > 0) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"ERROR: Transaction cannot be @ pos %llu: "
"Another transaction was opened at %llu",
pos, last_known_commit)));
MXS_ERROR("Transaction cannot be @ pos %llu: "
"Another transaction was opened at %llu",
pos, last_known_commit);
free(statement_sql);
gwbuf_free(result);
@ -1386,8 +1348,7 @@ int fde_seen = 0;
event_bytes = 0;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"> Transaction starts @ pos %llu", pos)));
MXS_DEBUG("> Transaction starts @ pos %llu", pos);
}
}
@ -1397,8 +1358,8 @@ int fde_seen = 0;
pending_transaction = 3;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" Transaction @ pos %llu, closing @ %llu", last_known_commit, pos)));
MXS_DEBUG(" Transaction @ pos %llu, closing @ %llu",
last_known_commit, pos);
}
}
free(statement_sql);
@ -1411,15 +1372,15 @@ int fde_seen = 0;
if (pending_transaction > 0) {
pending_transaction = 2;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" Transaction XID @ pos %llu, closing @ %llu", last_known_commit, pos)));
MXS_DEBUG(" Transaction XID @ pos %llu, closing @ %llu",
last_known_commit, pos);
}
}
if (pending_transaction > 1) {
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"< Transaction @ pos %llu, is now closed @ %llu. %lu events seen", last_known_commit, pos, transaction_events)));
MXS_DEBUG("< Transaction @ pos %llu, is now closed @ %llu. %lu events seen",
last_known_commit, pos, transaction_events);
pending_transaction = 0;
last_known_commit = pos;
@ -1435,27 +1396,24 @@ int fde_seen = 0;
/* pos and next_pos sanity checks */
if (hdr.next_pos > 0 && hdr.next_pos < pos) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s: next pos %u < pos %llu, truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
pos)));
MXS_INFO("Binlog %s: next pos %u < pos %llu, truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
pos);
router->binlog_position = last_known_commit;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1464,29 +1422,26 @@ int fde_seen = 0;
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size)) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
hdr.event_size,
pos)));
MXS_INFO("Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
hdr.event_size,
pos);
router->binlog_position = last_known_commit;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
if (fix) {
if (ftruncate(router->binlog_fd, router->binlog_position) == 0) {
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position)));
MXS_NOTICE("Binlog file %s has been truncated at %lu",
router->binlog_name,
router->binlog_position);
fsync(router->binlog_fd);
}
}
@ -1508,8 +1463,8 @@ int fde_seen = 0;
pos = hdr.next_pos;
} else {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Current event type %d @ %llu has nex pos = %u : exiting", hdr.event_type, pos, hdr.next_pos)));
MXS_ERROR("Current event type %d @ %llu has nex pos = %u : exiting",
hdr.event_type, pos, hdr.next_pos);
break;
}
@ -1517,20 +1472,18 @@ int fde_seen = 0;
}
if (pending_transaction) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s contains an Open Transaction, truncating to %llu",
router->binlog_name,
last_known_commit)));
MXS_INFO("Binlog %s contains an Open Transaction, truncating to %llu",
router->binlog_name,
last_known_commit);
router->binlog_position = last_known_commit;
router->current_safe_event = last_known_commit;
router->current_pos = pos;
router->pending_transaction = 1;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"warning : an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos)));
MXS_WARNING("an error has been found. "
"Setting safe pos to %lu, current pos %lu",
router->binlog_position, router->current_pos);
return 0;
} else {
@ -1727,10 +1680,9 @@ char *event_desc;
event_desc = blr_get_event_description(router, first_event.event_type);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"%lu @ %llu, %s, (%s), First EventTime",
first_event.event_time, first_event.event_pos,
event_desc != NULL ? event_desc : "unknown", buf_t)));
MXS_NOTICE("%lu @ %llu, %s, (%s), First EventTime",
first_event.event_time, first_event.event_pos,
event_desc != NULL ? event_desc : "unknown", buf_t);
/* Last Event */
localtime_r(&last_event.event_time, &tm_t);
@ -1742,9 +1694,8 @@ char *event_desc;
event_desc = blr_get_event_description(router, last_event.event_type);
LOGIF(LM, (skygw_log_write_flush(LOGFILE_MESSAGE,
"%lu @ %llu, %s, (%s), Last EventTime",
last_event.event_time, last_event.event_pos,
event_desc != NULL ? event_desc : "unknown", buf_t)));
MXS_NOTICE("%lu @ %llu, %s, (%s), Last EventTime",
last_event.event_time, last_event.event_pos,
event_desc != NULL ? event_desc : "unknown", buf_t);
}