MXS-1209: MariaDB GTID registration: handling of skipped binlog files from a Fake ROTATE_EVENT

MariaDB GTID Master registration:
creating missing binlog files (with 4 byes) between current one and the
filename coming from ROTATE_EVENT.

blr_slave_binlog_dump() is also checking possible empty files.
This commit is contained in:
MassimilianoPinto
2017-05-12 17:18:38 +02:00
parent 61ffd3e0f0
commit 2c346ffc1b
3 changed files with 347 additions and 92 deletions

View File

@ -89,6 +89,7 @@
#include <zlib.h>
#include <maxscale/alloc.h>
extern void poll_fake_write_event(DCB *dcb);
static char* get_next_token(char *str, const char* delim, char **saveptr);
extern int load_mysql_users(SERV_LISTENER *listener);
extern void blr_master_close(ROUTER_INSTANCE* router);
@ -287,8 +288,14 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave,
char *admin_stmt,
char *admin_options);
extern unsigned int blr_file_get_next_seqno(const char *filename);
extern uint32_t blr_slave_get_file_size(const char *filename);
static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave);
void poll_fake_write_event(DCB *dcb);
static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path);
/**
* Process a request packet from the slave server.
@ -309,6 +316,7 @@ void poll_fake_write_event(DCB *dcb);
int
blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
int rv = 0;
if (slave->state < 0 || slave->state > BLRS_MAXSTATE)
{
MXS_ERROR("Invalid slave state machine state (%d) for binlog router.",
@ -322,96 +330,109 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
{
case COM_QUERY:
slave->stats.n_queries++;
return blr_slave_query(router, slave, queue);
rv = blr_slave_query(router, slave, queue);
break;
case COM_REGISTER_SLAVE:
if (router->master_state == BLRM_UNCONFIGURED)
{
char *err_msg = "Binlog router is not yet configured"
" for replication.";
slave->state = BLRS_ERRORED;
blr_slave_send_error_packet(slave,
"Binlog router is not yet configured for replication",
(unsigned int) 1597, NULL);
err_msg,
1597,
NULL);
MXS_ERROR("%s: Slave %s: Binlog router is not yet configured for replication",
MXS_ERROR("%s: Slave %s: %s",
router->service->name,
slave->dcb->remote);
slave->dcb->remote,
err_msg);
dcb_close(slave->dcb);
return 1;
rv = 1;
}
/*
* If Master is MariaDB10 don't allow registration from
* MariaDB/Mysql 5 Slaves
*/
if (router->mariadb10_compat && !slave->mariadb10_compat)
else if (router->mariadb10_compat && !slave->mariadb10_compat)
{
char *err_msg = "MariaDB 10 Slave is required"
" for Slave registration.";
/**
* If Master is MariaDB10 don't allow registration from
* MariaDB/Mysql 5 Slaves
*/
slave->state = BLRS_ERRORED;
/* Send error that stops slave replication */
blr_send_custom_error(slave->dcb,
++slave->seqno,
0,
"MariaDB 10 Slave is required for Slave registration",
err_msg,
"42000",
1064);
MXS_ERROR("%s: Slave %s: a MariaDB 10 Slave is required for Slave registration",
MXS_ERROR("%s: Slave %s: %s",
router->service->name,
slave->dcb->remote);
slave->dcb->remote,
err_msg);
dcb_close(slave->dcb);
return 1;
rv = 1;
}
else if (router->mariadb10_master_gtid && !slave->mariadb_gtid)
{
/**
* If GTID master replication is set
* only GTID slaves can continue the registration.
*/
char *err_msg = "MariaDB 10 Slave GTID is required"
" for Slave registration.";
slave->state = BLRS_ERRORED;
/* Send error that stops slave replication */
blr_send_custom_error(slave->dcb,
++slave->seqno,
0,
"MariaDB 10 Slave GTID is required for Slave registration.",
err_msg,
"HY000",
//BINLOG_FATAL_ERROR_READING);
1597);
MXS_ERROR("%s: Slave %s: a MariaDB 10 Slave GTID request"
" is needed for Slave registration."
MXS_ERROR("%s: Slave %s: %s"
" Please use: CHANGE MASTER TO master_use_gtid=slave_pos.",
router->service->name,
slave->dcb->remote);
slave->dcb->remote,
err_msg);
dcb_close(slave->dcb);
return 1;
rv = 1;
}
else
{
/* Master and Slave version OK: continue with slave registration */
return blr_slave_register(router, slave, queue);
rv = blr_slave_register(router, slave, queue);
}
break;
case COM_BINLOG_DUMP:
rv = blr_slave_binlog_dump(router, slave, queue);
if (rv && router->send_slave_heartbeat && slave->heartbeat > 0)
{
char task_name[BLRM_TASK_NAME_LEN + 1] = "";
snprintf(task_name,
BLRM_TASK_NAME_LEN,
"%s slaves heartbeat send",
router->service->name);
int rc = blr_slave_binlog_dump(router, slave, queue);
if (router->send_slave_heartbeat && rc && slave->heartbeat > 0)
{
snprintf(task_name,
BLRM_TASK_NAME_LEN,
"%s slaves heartbeat send",
router->service->name);
/* Add slave heartbeat check task with 1 second frequency */
hktask_add(task_name, blr_send_slave_heartbeat, router, 1);
}
return rc;
/* Add slave heartbeat check task with 1 second frequency */
hktask_add(task_name, blr_send_slave_heartbeat, router, 1);
}
break;
case COM_STATISTICS:
return blr_statistics(router, slave, queue);
rv = blr_statistics(router, slave, queue);
break;
case COM_PING:
return blr_ping(router, slave, queue);
rv = blr_ping(router, slave, queue);
break;
case COM_QUIT:
MXS_DEBUG("COM_QUIT received from slave with server_id %d",
slave->serverid);
return 1;
rv = 1;
break;
default:
blr_send_custom_error(slave->dcb, 1, 0,
"You have an error in your SQL syntax; Check the "
@ -421,7 +442,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
MYSQL_COMMAND(queue));
break;
}
return 0;
return rv;
}
/*
@ -1723,14 +1744,27 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
}
}
MXS_DEBUG("%s: COM_BINLOG_DUMP: binlog name '%s', length %lu, "
"from position %lu.", router->service->name,
slave->binlogfile, strlen(slave->binlogfile),
MXS_DEBUG("%s: Slave %s:%i, COM_BINLOG_DUMP: binlog name '%s', length %lu, "
"from position %lu.",
router->service->name,
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->binlogfile,
strlen(slave->binlogfile),
(unsigned long)slave->binlog_pos);
/* First reply starts from seq = 1 */
slave->seqno = 1;
/**
* Check whether the request file is empty
* and try using next file in sequence.
* If one or more files have been skipped then
* the slave->binlog_pos is set to 4 and
* slave->binlogname set to new filename.
*/
blr_slave_skip_empty_files(router, slave);
/* Build and send Fake Rotate Event */
if (!blr_send_connect_fake_rotate(router, slave))
{
@ -1792,8 +1826,12 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
blr_slave_read_ste(router, slave, fde_end_pos);
}
/* Add GTID_LIST Fake Event before sending any new event */
if (slave->mariadb10_compat &&
/**
* Add GTID_LIST Fake Event before sending any new event
* Note: slave->binlog_pos must not be 4
*/
if (slave->binlog_pos != 4 &&
slave->mariadb10_compat &&
slave->mariadb_gtid)
{
if (!blr_send_fake_gtid_list(slave,
@ -6193,7 +6231,7 @@ static GWBUF *blr_build_fake_rotate_event(ROUTER_SLAVE *slave,
*
* Default position is 4, default file is router->binlog_file.
*
* If req_file is false then the file to read data from
* If req_file is false then the file to read data from
* could be either router->binlog_file or the file the GTID
* belongs to.
*
@ -6958,7 +6996,7 @@ static bool blr_handle_set_stmt(ROUTER_INSTANCE *router,
heading[strlen(heading) - 1] = '\0';
if (!heading[0])
{
MXS_ERROR("Cannot request empty GTID righ now");
MXS_ERROR("Cannot request empty GTID right now");
blr_slave_send_error_packet(slave,
"Empty GTID not implemented righ now",
(unsigned int)1198, NULL);
@ -7417,3 +7455,81 @@ static bool blr_handle_admin_stmt(ROUTER_INSTANCE *router,
return false;
}
/**
* Skip reading empty binlog files (4 bytes only)
*
* @param router Current router instance
* @param slave Current connected slave
*/
static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave)
{
char binlog_file[BLRM_BINLOG_NAME_STR_LEN + 1];
char router_curr_file[BLRM_BINLOG_NAME_STR_LEN + 1];
char file_path[PATH_MAX + 1] = "";
unsigned int seqno;
bool skipped_files = false;
// Save the current router binlog filename
spinlock_acquire(&router->binlog_lock);
strcpy(router_curr_file, router->binlog_name);
spinlock_release(&router->binlog_lock);
// Set the starting filename
strcpy(binlog_file, slave->binlogfile);
// Get binlog filename full-path
blr_get_file_fullpath(binlog_file,
router->binlogdir,
file_path);
/**
* Set the next file in sequence if current file has 4 bytes size.
* Stop if the new file is the urrent binlog file.
*/
while (strcmp(binlog_file, router_curr_file) != 0 &&
blr_slave_get_file_size(file_path) == 4 &&
(seqno = blr_file_get_next_seqno(binlog_file)) > 0)
{
// Log skipped file
MXS_INFO("Slave %s:%i, skip reading empty file '%s' (4 bytes size).",
slave->dcb->remote,
dcb_get_port(slave->dcb),
binlog_file);
// Set next in sequence binlog file name
sprintf(binlog_file, BINLOG_NAMEFMT, router->fileroot, seqno);
// Get binlog file full-path
blr_get_file_fullpath(binlog_file,
router->binlogdir,
file_path);
skipped_files = true;
}
// One or more files skipped: set last found filename and pos = 4
if (skipped_files)
{
strcpy(slave->binlogfile, binlog_file);
slave->binlog_pos = 4;
}
}
/**
* Get the full path of a binlog filename.
*
* @param binlog_file The binlog filename
* @param root_dir The binlog storage directory
* @param full_path The output fullpahth name:
* the memory area must be preallocated.
*/
static inline void blr_get_file_fullpath(const char *binlog_file,
const char *root_dir,
char *full_path)
{
strcpy(full_path, root_dir);
strcat(full_path, "/");
strcat(full_path, binlog_file);
}