Initial implementation of transaction safety

Initial implementation of transaction safety
This commit is contained in:
MassimilianoPinto
2015-08-05 14:29:17 +02:00
parent 619261cd21
commit f91a339674
5 changed files with 836 additions and 39 deletions

View File

@ -33,6 +33,7 @@
* 12/06/15 Massimiliano Pinto Added mariadb10 new events
* 23/06/15 Massimiliano Pinto Addition of MASTER_SERVER_CFG struct
* 24/06/15 Massimiliano Pinto Added BLRM_UNCONFIGURED state
* 05/08/15 Massimiliano Pinto Initial implementation of transaction safety
*
* @endverbatim
*/
@ -357,9 +358,13 @@ typedef struct router_instance {
MASTER_RESPONSES saved_master; /*< Saved master responses */
char *binlogdir; /*< The directory with the binlog files */
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
int trx_safe; /*< Detect and handle partial transactions */
int pending_transaction; /*< Pending transaction */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */
uint64_t binlog_position;
/*< last committed transaction position */
uint64_t current_pos;
/*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog
* file being written

View File

@ -268,6 +268,9 @@ int rc = 0;
inst->m_errno = 0;
inst->m_errmsg = NULL;
inst->trx_safe = 0;
inst->pending_transaction = 0;
my_uuid_init((ulong)rand()*12345,12345);
if ((defuuid = (unsigned char *)malloc(20)) != NULL)
{
@ -348,6 +351,10 @@ int rc = 0;
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "transaction_safety") == 0)
{
inst->trx_safe = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -414,6 +421,8 @@ int rc = 0;
service->name)));
}
fprintf(stderr, "Transaction safety is [%i]\n", inst->trx_safe);
if (inst->fileroot == NULL)
inst->fileroot = strdup(BINLOG_NAME_ROOT);
inst->active_logs = 0;
@ -426,6 +435,8 @@ int rc = 0;
inst->lastEventTimestamp = 0;
inst->binlog_position = 0;
inst->current_pos = 0;
strcpy(inst->binlog_name, "");
strcpy(inst->prevbinlog, "");
@ -602,6 +613,18 @@ int rc = 0;
* Now start the replication from the master to MaxScale
*/
if (inst->master_state == BLRM_UNCONNECTED) {
/* NOTE: This setting will be replaced by calling
* blr_read_events_all() routine soon
* The routine may truncate binlog file or put
* master_state into BLR_SLAVE_STOPPED state.
* If an open transaction is detected @ pos xxx
* inst->binlog_position will be set to xxx
*/
if (inst->binlog_position == 0)
inst->binlog_position = inst->current_pos;
/* Start replication from master server */
blr_start_master(inst);
}

View File

@ -31,6 +31,7 @@
* 23/06/2015 Massimiliano Pinto Addition of blr_file_use_binlog, blr_file_create_binlog
* 29/06/2015 Massimiliano Pinto Addition of blr_file_write_master_config()
* Cache directory is now 'cache' under router->binlogdir
* 05/08/2015 Massimiliano Pinto Initial implementation of transaction safety
*
* @endverbatim
*/
@ -184,7 +185,8 @@ blr_file_add_magic(ROUTER_INSTANCE *router, int fd)
unsigned char magic[] = BINLOG_MAGIC;
write(fd, magic, 4);
router->binlog_position = 4; /* Initial position after the magic number */
router->current_pos = 4; /* Initial position after the magic number */
//router->binlog_position = 4; /* Initial position after the magic number */
}
@ -252,16 +254,20 @@ int fd;
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file,BINLOG_FNAMELEN);
router->binlog_position = lseek(fd, 0L, SEEK_END);
if (router->binlog_position < 4) {
if (router->binlog_position == 0) {
//router->binlog_position = lseek(fd, 0L, SEEK_END);
router->current_pos = lseek(fd, 0L, SEEK_END);
//if (router->binlog_position < 4) {
if (router->current_pos < 4) {
//if (router->binlog_position == 0) {
if (router->current_pos == 0) {
blr_file_add_magic(router, fd);
} else {
/* If for any reason the file's length is between 1 and 3 bytes
* then report an error. */
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: binlog file %s has an invalid length %d.",
router->service->name, path, router->binlog_position)));
//router->service->name, path, router->binlog_position)));
router->service->name, path, router->current_pos)));
close(fd);
spinlock_release(&router->binlog_lock);
return;
@ -298,7 +304,8 @@ int n;
return 0;
}
spinlock_acquire(&router->binlog_lock);
router->binlog_position = hdr->next_pos;
//router->binlog_position = hdr->next_pos;
router->current_pos = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
spinlock_release(&router->binlog_lock);
return n;
@ -893,3 +900,421 @@ char tmp_file[PATH_MAX + 1] = "";
return 0;
}
/**
* Read all replication events from a binlog file in order to detect pending transactions
*
* @param router The router instance
* @param fix Whether to fix or not errors
* @param debug Whether to enable or not the debug for events
* @return 0 on success, >0 on failure
*/
int
blr_read_events_all_events(ROUTER_INSTANCE *router, int fix, int debug) {
unsigned long filelen = 0;
struct stat statb;
uint8_t hdbuf[19];
uint8_t *data;
GWBUF *result;
unsigned long long pos = 4;
unsigned long long last_known_commit = 4;
REP_HEADER hdr;
int pending_transaction = 0;
int n;
int db_name_len;
char *statement_sql;
uint8_t *ptr;
int len;
int var_block_len;
int statement_len;
int checksum_len=0;
int found_chksum = 0;
if (router->binlog_fd == -1) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Current binlog file %s is not open",
router->binlog_name)));
return 1;
}
if (fstat(router->binlog_fd, &statb) == 0)
filelen = statb.st_size;
while (1){
//fprintf(stderr, "Pos %llu pending trx = %i\n", pos, pending_transaction);
/* Read the header information from the file */
if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19) {
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"End of binlog file [%s] at %llu.",
router->binlog_name,
pos)));
if (pending_transaction) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"WARNING: Binlog file %s contains a previous Opened Transaction, "
"Not Truncate file @ %llu but pos for slave is safe",
router->binlog_name,
last_known_commit)));
//ftruncate(router->binlog_fd, last_known_commit);
}
break;
case -1:
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Failed to read binlog file %s at position %llu"
" (%s).", router->binlog_name,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Bad file descriptor in read binlog for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd)));
break;
default:
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name, pos)));
break;
}
/* force last_known_commit position */
if (pending_transaction) {
router->binlog_position = last_known_commit;
router->pending_transaction = 1;
pending_transaction = 0;
} else {
router->binlog_position = pos;
}
/* Truncate file in case of any error */
if (n != 0) {
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 1;
} else {
return 0;
}
}
/* fill replication header struct */
hdr.timestamp = EXTRACT32(hdbuf);
hdr.event_type = hdbuf[4];
hdr.serverid = EXTRACT32(&hdbuf[5]);
hdr.event_size = extract_field(&hdbuf[9], 32);
hdr.next_pos = EXTRACT32(&hdbuf[13]);
hdr.flags = EXTRACT16(&hdbuf[17]);
//fprintf(stderr, ">>>> pos [%llu] event type %i\n", pos, hdr.event_type);
//fprintf(stderr, ">>>> pos [%llu] event size %lu\n", pos, hdr.event_size);
//fprintf(stderr, ">>>> pos [%llu] event next_pos %lu\n", pos, hdr.next_pos);
/* TO DO */
/* Add MariaDB 10 check */
/* Check event type against MAX_EVENT_TYPE */
if (hdr.event_type > MAX_EVENT_TYPE)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Found an Invalid event type 0x%x. "
"Binlog file is %s, position %llu",
hdr.event_type,
router->binlog_name, pos)));
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 1;
}
/* Allocate a GWBUF for the event */
if ((result = gwbuf_alloc(hdr.event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Failed to allocate memory for binlog entry, "
"size %d at %llu.",
hdr.event_size, pos)));
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 1;
}
/* Copy the header in the buffer */
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19);// Copy the header in
/* Read event data */
if ((n = pread(router->binlog_fd, &data[19], hdr.event_size - 19, pos + 19)) != hdr.event_size - 19)
{
if (n == -1)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Error reading the event at %llu in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name,
strerror(errno), hdr.event_size - 19)));
}
else
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Short read when reading the event at %llu in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name, hdr.event_size - 19, n)));
if (filelen > 0 && filelen - pos < hdr.event_size)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Binlog event is close to the end of the binlog file %s, "
" size is %lu.",
router->binlog_name, filelen)));
}
}
gwbuf_free(result);
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 1;
}
/* check for pending transaction */
if (pending_transaction == 0) {
last_known_commit = pos;
}
/* get event content */
ptr = data+19;
/* check for FORMAT DESCRIPTION EVENT */
if(hdr.event_type == FORMAT_DESCRIPTION_EVENT) {
int event_header_length;
int event_header_ntypes;
int n_events;
int check_alg;
uint8_t *checksum;
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"- Format Description event FDE @ %llu, size %lu",
pos, hdr.event_size)));
event_header_length = ptr[2 + 50 + 4];
event_header_ntypes = hdr.event_size - event_header_length - (2 + 50 + 4 + 1);
if (event_header_ntypes == 168) {
/* mariadb 10 LOG_EVENT_TYPES*/
event_header_ntypes -= 163;
} else {
if (event_header_ntypes == 165) {
/* mariadb 5 LOG_EVENT_TYPES*/
event_header_ntypes -= 160;
} else {
/* mysql 5.6 LOG_EVENT_TYPES = 35 */
event_header_ntypes -= 35;
}
}
n_events = hdr.event_size - event_header_length - (2 + 50 + 4 + 1);
if(debug) {
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE ServerVersion [%50s]", ptr + 2)));
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE Header EventLength %i"
", N. of supported MySQL/MAriaDB events %i",
event_header_length,
(n_events - event_header_ntypes))));
}
if (event_header_ntypes < n_events) {
checksum = ptr + hdr.event_size - event_header_length - event_header_ntypes;
check_alg = checksum[0];
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" FDE Checksum alg desc %i, alg type %s",
check_alg,
check_alg == 1 ? "BINLOG_CHECKSUM_ALG_CRC32" : "NONE or UNDEF")));
if (check_alg == 1) {
checksum_len = 4;
found_chksum = 1;
} else {
found_chksum = 0;
}
}
}
/* Decode ROTATE EVENT */
if(hdr.event_type == ROTATE_EVENT) {
int len, slen;
uint64_t new_pos;
char file[BINLOG_FNAMELEN+1];
len = hdr.event_size - 19;
new_pos = extract_field(ptr+4, 32);
new_pos <<= 32;
new_pos |= extract_field(ptr, 32);
slen = len - (8 + 4); // Allow for position and CRC
if (found_chksum == 0)
slen += 4;
if (slen > BINLOG_FNAMELEN)
slen = BINLOG_FNAMELEN;
memcpy(file, ptr + 8, slen);
file[slen] = 0;
if(debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"- Rotate event @ %llu, next file is [%s] @ %llu",
pos, file, new_pos)));
}
/* Check QUERY_EVENT */
if(hdr.event_type == QUERY_EVENT) {
char *statement_sql;
db_name_len = ptr[4 + 4];
var_block_len = ptr[4 + 4 + 1 + 2];
statement_len = hdr.event_size - 19 - (4+4+1+2+2+var_block_len+1+db_name_len);
//if (checksum_len)
// statement_len -= checksum_len;
statement_sql = calloc(1, statement_len+1);
strncpy(statement_sql, (char *)ptr+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
//fprintf(stderr, "QUERY_EVENT = [%s] %i / %i\n", statement_sql, (4+4+1+2+2+var_block_len+1+db_name_len), statement_len);
/* A transaction starts with this event */
if (strncmp(statement_sql, "BEGIN", 5) == 0) {
if (pending_transaction > 0) {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Transaction cannot be @ pos %llu: "
"Another transaction was opened at %ll",
pos, last_known_commit)));
free(statement_sql);
gwbuf_free(result);
break;
} else {
pending_transaction = 1;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"> Transaction starts @ pos %llu", pos)));
}
}
/* Commit received for non transactional tables, i.e. MyISAM */
if (strncmp(statement_sql, "COMMIT", 6) == 0) {
if (pending_transaction > 0) {
pending_transaction = 3;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" Transaction @ pos %llu, closing @ %llu", last_known_commit, pos)));
}
}
free(statement_sql);
}
if(hdr.event_type == XID_EVENT) {
/* Commit received for a transactional tables, i.e. InnoDB */
uint64_t xid;
xid = extract_field(ptr, 64);
if (pending_transaction > 0) {
pending_transaction = 2;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
" Transaction XID @ pos %llu, closing @ %llu", last_known_commit, pos)));
}
}
if (pending_transaction > 1) {
unsigned long long prev_pos = last_known_commit;
if (debug)
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"< Transaction @ pos %llu, is now closed @ %llu", last_known_commit, pos)));
pending_transaction = 0;
last_known_commit = pos;
}
gwbuf_free(result);
/* pos and next_pos sanity checks */
if (hdr.next_pos > 0 && hdr.next_pos < pos) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s: next pos %llu < pos %llu, truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
pos)));
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 3;
}
if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size)) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s: next pos %llu != (pos %llu + event_size %llu), truncating to %llu",
router->binlog_name,
hdr.next_pos,
pos,
hdr.event_size,
pos)));
ftruncate(router->binlog_fd, pos);
router->binlog_position = pos;
return 3;
}
/* set pos to new value */
if (hdr.next_pos > 0) {
pos = hdr.next_pos;
} else {
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"*** ERROR: Current event type %lu @ %llu has nex pos = %llu : exiting", hdr.event_type, pos, hdr.next_pos)));
break;
}
}
if (pending_transaction) {
LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE,
"Binlog %s contains an Open Transaction, truncating to %llu",
router->binlog_name,
last_known_commit)));
ftruncate(router->binlog_fd, last_known_commit);
router->binlog_position = last_known_commit;
return 2;
} else {
router->binlog_position = pos;
return 0;
}
}

View File

@ -39,6 +39,7 @@
* 23/06/2015 Massimiliano Pinto Master communication goes into BLRM_SLAVE_STOPPED state
* when an error is encountered in BLRM_BINLOGDUMP state.
* Server error code and msg are reported via SHOW SLAVE STATUS
* 03/08/2015 Massimiliano Pinto Initial implementation of transaction safety
*
* @endverbatim
*/
@ -87,6 +88,7 @@ void blr_master_close(ROUTER_INSTANCE *);
static char *blr_extract_column(GWBUF *buf, int col);
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr);
static int keepalive = 1;
/**
@ -167,7 +169,7 @@ DCB *client;
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: attempting to connect to master server %s:%d, binlog %s, pos %lu",
router->service->name, router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, router->binlog_position)));
router->service->name, router->service->dbref->server->name, router->service->dbref->server->port, router->binlog_name, router->current_pos)));
router->connect_time = time(0);
@ -631,7 +633,7 @@ char query[128];
"%s: Request binlog records from %s at "
"position %lu from master server %s:%d",
router->service->name, router->binlog_name,
router->binlog_position,
router->current_pos,
router->service->dbref->server->name,
router->service->dbref->server->port)));
break;
@ -729,7 +731,7 @@ int len = 0x1b;
data[3] = 0; // Sequence ID
data[4] = COM_BINLOG_DUMP; // Command
encode_value(&data[5],
router->binlog_position, 32); // binlog position
router->current_pos, 32); // binlog position
encode_value(&data[9], 0, 16); // Flags
encode_value(&data[11],
router->serverid, 32); // Server-id of MaxScale
@ -842,7 +844,7 @@ int n_bufs = -1, pn_bufs = -1;
"Insufficient memory to buffer event "
"of %d bytes. Binlog %s @ %d.",
len, router->binlog_name,
router->binlog_position)));
router->current_pos)));
break;
}
@ -867,7 +869,7 @@ int n_bufs = -1, pn_bufs = -1;
"chain, but failed to create complete "
"message as expected. %s @ %d",
router->binlog_name,
router->binlog_position)));
router->current_pos)));
free(msg);
msg = NULL;
break;
@ -888,7 +890,7 @@ int n_bufs = -1, pn_bufs = -1;
LOGFILE_DEBUG,
"Residual data left after %d records. %s @ %d",
router->stats.n_binlogs,
router->binlog_name, router->binlog_position)));
router->binlog_name, router->current_pos)));
break;
}
else
@ -943,7 +945,7 @@ int n_bufs = -1, pn_bufs = -1;
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
router->current_pos,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
@ -1000,7 +1002,7 @@ int n_bufs = -1, pn_bufs = -1;
"Closing master connection.",
router->service->name,
router->binlog_name,
router->binlog_position)));
router->current_pos)));
blr_master_close(router);
blr_master_delayed_connect(router);
return;
@ -1010,10 +1012,109 @@ int n_bufs = -1, pn_bufs = -1;
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
/**
* Check for an open transaction, if the option is set
* Only complete transactions should be sent to sleves
*
* If a trasaction is pending router->last_commit_pos
* won't be updated.
*/
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
/* set last_commit_pos to binlog_position */
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos;
spinlock_release(&router->binlog_lock);
} else {
/**
* A transaction is pending.
* Last_commit_pos is on hold.
*
* Log a message if the transaction was opened
* at binlog router start
*/
if (router->last_written == 0) {
fprintf(stderr, "*** Router started with an Open transaction at %lu / %lu\n", router->binlog_position, router->current_pos);
}
}
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
//fprintf(stderr, "*** binlog last_commit_pos = %lu, binlog_pos = %lu\n", router->binlog_position, router->current_pos);
/**
* Detect transactions in events
* Only complete transactions should be sent to sleves
*/
if (router->trx_safe) {
/**
* look for QUERY_EVENT [BEGIN / COMMIT] and XID_EVENT
*/
if(hdr.event_type == QUERY_EVENT) {
char *statement_sql;
int db_name_len, var_block_len, statement_len;
db_name_len = ptr[4+20+ 4 + 4];
var_block_len = ptr[4+20+ 4 + 4 + 1 + 2];
statement_len = len - (4+20+4+4+1+2+2+var_block_len+1+db_name_len);
statement_sql = calloc(1, statement_len+1);
strncpy(statement_sql, (char *)ptr+4+20+4+4+1+2+2+var_block_len+1+db_name_len, statement_len);
/* Check for BEGIN (it comes for START TRANSACTION too) */
if (strncmp(statement_sql, "BEGIN", 5) == 0) {
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction > 0) {
//fprintf(stderr, "*** A transaction is already open!!!!\n");
// stop replication ????
}
router->pending_transaction = 1;
spinlock_release(&router->binlog_lock);
//fprintf(stderr, "Transaction is starting @ %llu / %llu\n", router->binlog_position, router->current_pos);
}
/* Check for COMMIT in non transactional store engines */
if (strncmp(statement_sql, "COMMIT", 6) == 0) {
spinlock_acquire(&router->binlog_lock);
router->pending_transaction = 2;
spinlock_release(&router->binlog_lock);
//fprintf(stderr, "Transaction closed @ %llu / %llu\n", router->binlog_position, router->current_pos);
}
free(statement_sql);
}
/* Check for COMMIT in Transactional engines, i.e InnoDB */
if(hdr.event_type == XID_EVENT) {
uint64_t xid;
xid = extract_field(ptr+4+20, 64);
if (router->pending_transaction) {
spinlock_acquire(&router->binlog_lock);
router->pending_transaction = 3;
//fprintf(stderr, "Transaction XID closed @ %llu / %llu", router->binlog_position, router->current_pos);
spinlock_release(&router->binlog_lock);
}
}
}
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
@ -1026,7 +1127,7 @@ int n_bufs = -1, pn_bufs = -1;
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->current_pos)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
@ -1071,7 +1172,7 @@ int n_bufs = -1, pn_bufs = -1;
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->current_pos)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
@ -1080,6 +1181,8 @@ int n_bufs = -1, pn_bufs = -1;
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
/* current event is being written to disk file */
if (blr_write_binlog_record(router, &hdr, ptr) == 0)
{
/*
@ -1094,6 +1197,8 @@ int n_bufs = -1, pn_bufs = -1;
blr_master_delayed_connect(router);
return;
}
/* Check for rotete event */
if (hdr.event_type == ROTATE_EVENT)
{
if (!blr_rotate_event(router, ptr, &hdr))
@ -1111,7 +1216,76 @@ int n_bufs = -1, pn_bufs = -1;
return;
}
}
/**
* Distributing binlog events to slaves
* may depend on pending transaction
*/
if (router->trx_safe == 0 || (router->trx_safe && router->pending_transaction == 0)) {
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos;
spinlock_release(&router->binlog_lock);
/* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr);
} else {
/**
* If transaction is closed:
*
* 1) read current binlog starting
* from router->last_commit_pos
*
* 2) distribute read event
*
* 3) if current pos = router->binlog_position
* update router->last_commit_pos
*
*/
if (router->pending_transaction > 1) {
unsigned long pos;
GWBUF *record;
uint8_t *raw_data;
REP_HEADER new_hdr;
int i=0;
//fprintf(stderr, "Time to feed slaves with complete transaction from %llu / %llu\n", router->binlog_position, router->current_pos);
spinlock_acquire(&router->binlog_lock);
pos = router->binlog_position;
spinlock_release(&router->binlog_lock);
while ((record = blr_read_events_from_pos(router, pos, &new_hdr)) != NULL) {
i++;
raw_data = GWBUF_DATA(record);
//fprintf(stderr, "Read event %i, last commit @ %lu / %lu. *** Distributing event: Type [%i], size %u: pos @ %llu (next is %llu)\n", i, pos, router->current_pos, new_hdr.event_type, new_hdr.event_size, router->binlog_position, new_hdr.next_pos);
/* distribute event */
blr_distribute_binlog_record(router, &new_hdr, raw_data);
spinlock_acquire(&router->binlog_lock);
pos = new_hdr.next_pos;
spinlock_release(&router->binlog_lock);
gwbuf_free(record);
}
spinlock_acquire(&router->binlog_lock);
router->binlog_position = router->current_pos;
router->pending_transaction = 0;
spinlock_release(&router->binlog_lock);
} else {
/* A transaction is still pending */
//fprintf(stderr, "A Transaction is still pending @ %llu, master is @ %llu\n", router->binlog_position, router->current_pos);
}
}
}
else
{
@ -1125,7 +1299,7 @@ int n_bufs = -1, pn_bufs = -1;
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
router->current_pos)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
@ -1178,7 +1352,7 @@ int n_bufs = -1, pn_bufs = -1;
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
router->current_pos)));
router->stats.n_binlog_errors++;
}
@ -1574,3 +1748,146 @@ char *rval;
return rval;
}
/**
* Read a replication event form current opened binlog into a GWBUF structure.
*
* @param router The router instance
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF
*blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr) {
unsigned long long end_pos = 0;
struct stat statb;
uint8_t hdbuf[19];
uint8_t *data;
GWBUF *result;
int n;
/* Get current binnlog position */
end_pos = router->current_pos;
/* end of file reached, we're done */
if (pos == end_pos) {
return NULL;
}
/* error */
if (pos > end_pos)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Error: Reading saved events, the specified pos %lu "
"is ahead of current pos %lu for file %s",
pos, router->current_pos, router->binlog_name)));
return NULL;
}
/* Read the event header information from the file */
if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reading saved events: reached end of binlog file at %d.", pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: failed to read binlog "
"file %s at position %d"
" (%s).", router->binlog_name,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: bad file descriptor for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
n, router->binlog_name, pos)));
break;
}
return NULL;
}
//fprintf(stderr, "blr_read_events_from_pos read %i bytes. ptr[0]=[%i]\n", n, hdbuf[0]);
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
//fprintf(stderr, ">>>> event type %i\n", hdr->event_type);
//fprintf(stderr, ">>>> event size %lu\n", hdr->event_size);
//fprintf(stderr, ">>>> event next_pos %lu\n", hdr->next_pos);
/* Add MariaDB 10 checks */
if (hdr->event_type > MAX_EVENT_TYPE)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: invalid event type 0x%x. "
"Binlog file is %s, position %d",
hdr->event_type,
router->binlog_name, pos)));
return NULL;
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: failed to allocate memory for binlog entry, "
"size %d at %d.",
hdr->event_size, pos)));
return NULL;
}
/* Copy event header*/
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19);
/* Read event data and put int into buffer after header */
if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19)
{
if (n == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: the event at %ld in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name,
strerror(errno), hdr->event_size - 19)));
} else {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: short read when reading "
"the event at %ld in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name, hdr->event_size - 19, n)));
if (end_pos - pos < hdr->event_size)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error: Reading saved events: binlog event "
"is close to the end of the binlog file, "
"current file size is %u.", end_pos)));
}
}
/* free buffer */
gwbuf_free(result);
return NULL;
}
// fprintf(stderr, "blr_read_events_from_pos read [%i] next bytes, raw data size is [%i]\n", n, hdr->event_size);
return result;
}

View File

@ -545,7 +545,7 @@ extern char *strcasestr();
free(query_text);
if (router->master_state == BLRM_SLAVE_STOPPED) {
char path[4097] = "";
char path[PATH_MAX + 1] = "";
char error_string[BINLOG_ERROR_MSG_LEN + 1] = "";
MASTER_SERVER_CFG *current_master = NULL;
int removed_cfg = 0;
@ -1013,7 +1013,9 @@ int len, file_len;
sprintf(file, "%s", router->binlog_name);
file_len = strlen(file);
sprintf(position, "%ld", router->binlog_position);
sprintf(position, "%lu", router->binlog_position);
len = 5 + file_len + strlen(position) + 1 + 3;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
@ -1125,7 +1127,12 @@ char *dyn_column=NULL;
strncpy((char *)ptr, column, col_len); // Result string
ptr += col_len;
sprintf(column, "%ld", router->binlog_position);
/* if router->trx_safe report current_pos*/
if (router->trx_safe)
sprintf(column, "%lu", router->current_pos);
else
sprintf(column, "%lu", router->binlog_position);
col_len = strlen(column);
*ptr++ = col_len; // Length of result string
strncpy((char *)ptr, column, col_len); // Result string
@ -2517,7 +2524,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
slave->dcb->remote,
router->service->dbref->server->name,
router->service->dbref->server->port,
router->binlog_name, router->binlog_position)));
router->binlog_name, router->current_pos)));
return blr_slave_send_ok(router, slave);
}
@ -2534,7 +2541,7 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
static int
blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
{
char path[4097]="";
char path[PATH_MAX+1]="";
int loaded;
/* if unconfigured return an error */
@ -2556,10 +2563,27 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
spinlock_release(&router->lock);
/* create a new binlog or just use current one */
if (strcmp(router->prevbinlog, router->binlog_name))
if (strcmp(router->prevbinlog, router->binlog_name)) {
if (router->trx_safe && router->pending_transaction) {
char msg[1024+1] = "";
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning: a transaction is still opened at pos %lu. "
"Current pos is %lu in file %s",
router->binlog_position,
router->current_pos, router->prevbinlog)));
snprintf(msg, 1024, "A transaction is still opened at pos %lu. Current pos is %lu in file %s", router->binlog_position, router->current_pos, router->prevbinlog);
blr_slave_send_error_packet(slave, msg, (unsigned int)1254, NULL);
return 1;
}
blr_file_new_binlog(router, router->binlog_name);
else
} else {
blr_file_use_binlog(router, router->binlog_name);
}
blr_start_master(router);
@ -2572,13 +2596,13 @@ blr_start_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
router->service->dbref->server->name,
router->service->dbref->server->port,
router->binlog_name,
router->binlog_position)));
router->current_pos)));
/* File path for router cached authentication data */
strcpy(path, router->binlogdir);
strncat(path, "/cache", 4096);
strncat(path, "/cache", PATH_MAX);
strncat(path, "/dbusers", 4096);
strncat(path, "/dbusers", PATH_MAX);
/* Try loading dbusers from configured backends */
loaded = load_mysql_users(router->service);
@ -2792,7 +2816,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
master_logfile,
4,
router->binlog_name,
router->binlog_position);
router->current_pos);
return_error = 1;
} else {
@ -2803,7 +2827,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
master_logfile,
4,
router->binlog_name,
router->binlog_position);
router->current_pos);
return_error = 1;
}
@ -2831,6 +2855,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
memset(router->binlog_name, '\0', sizeof(router->binlog_name));
strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN);
router->current_pos = 4;
router->binlog_position = 4;
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_FILE is [%s]",
@ -2860,13 +2885,13 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
}
} else {
if (pos > 0 && pos != router->binlog_position) {
if (pos > 0 && pos != router->current_pos) {
snprintf(error, BINLOG_ERROR_MSG_LEN, "Can not set MASTER_LOG_POS to %s: "
"Permitted binlog pos is %lu. Current master_log_file=%s, master_log_pos=%lu",
passed_pos,
router->binlog_position,
router->current_pos,
router->binlog_name,
router->binlog_position);
router->current_pos);
return_error = 1;
}
@ -2893,6 +2918,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
* Also set binlog name if UNCOFIGURED
*/
if (router->master_state == BLRM_UNCONFIGURED) {
router->current_pos = 4;
router->binlog_position = 4;
memset(router->binlog_name, '\0', sizeof(router->binlog_name));
strncpy(router->binlog_name, master_logfile, BINLOG_FNAMELEN);
@ -2904,7 +2930,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "%s: New MASTER_LOG_POS is [%u]",
router->service->name,
router->binlog_position)));
router->current_pos)));
if (master_log_pos)
free(master_log_pos);
@ -2967,7 +2993,7 @@ int blr_handle_change_master(ROUTER_INSTANCE* router, char *command, char *error
router->service->dbref->server->name,
router->service->dbref->server->port,
router->binlog_name,
router->binlog_position,
router->current_pos,
router->user)));
blr_master_free_config(current_master);
@ -3150,7 +3176,7 @@ char *blr_set_master_logfile(ROUTER_INSTANCE *router, char *command, char *error
router->fileroot,
next_binlog_seqname,
router->binlog_name,
router->binlog_position);
router->current_pos);
free(logfile);
@ -3180,7 +3206,7 @@ static void
blr_master_get_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *curr_master) {
curr_master->port = router->service->dbref->server->port;
curr_master->host = strdup(router->service->dbref->server->name);
curr_master->pos = router->binlog_position;
curr_master->pos = router->current_pos;
strncpy(curr_master->logfile, router->binlog_name, BINLOG_FNAMELEN);
curr_master->user = strdup(router->user);
curr_master->password = strdup(router->password);
@ -3230,6 +3256,7 @@ blr_master_set_empty_config(ROUTER_INSTANCE *router) {
server_update_address(router->service->dbref->server, "none");
server_update_port(router->service->dbref->server, (unsigned short)3306);
router->current_pos = 4;
router->binlog_position = 4;
strcpy(router->binlog_name, "");
}
@ -3244,7 +3271,7 @@ static void
blr_master_apply_config(ROUTER_INSTANCE *router, MASTER_SERVER_CFG *prev_master) {
server_update_address(router->service->dbref->server, prev_master->host);
server_update_port(router->service->dbref->server, prev_master->port);
router->binlog_position = prev_master->pos;
router->current_pos = prev_master->pos;
strcpy(router->binlog_name, prev_master->logfile);
if (router->user) {
free(router->user);